]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Drop explicit tests retries
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 13 Jan 2022 21:57:45 +0000 (22:57 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 13 Jan 2022 21:57:45 +0000 (22:57 +0100)
Now we just retry all failing tests automatically.

18 files changed:
psycopg/setup.py
pyproject.toml
tests/conftest.py
tests/constraints.txt
tests/pool/test_null_pool.py
tests/pool/test_null_pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py
tests/test_concurrency.py
tests/test_concurrency_async.py
tests/test_copy.py
tests/test_copy_async.py
tests/test_cursor.py
tests/test_cursor_async.py
tests/test_dns_srv.py
tests/test_server_cursor.py
tests/test_server_cursor_async.py
tests/test_waiting.py

index 69c64ece35cd87e886ee90713ae08759e5560cad..3cc8b57f4b2872d5de2a20556cc8fe88574d6bac 100644 (file)
@@ -43,7 +43,6 @@ extras_require = {
         "pytest-asyncio >= 0.17",
         "pytest-cov >= 3.0",
         "pytest-randomly >= 3.10",
-        "tenacity >= 8.0",
     ],
     # Requirements needed for development
     "dev": [
index 2c8dfc050b5f3f7be8227ca3dd41ee5dde47125e..1d6eab8a145e09caccbdc7ce611e9fd6ced2be1d 100644 (file)
@@ -45,10 +45,6 @@ module = [
 ]
 ignore_missing_imports = true
 
-[[tool.mypy.overrides]]
-module = "tenacity.*"
-implicit_reexport = true
-
 [[tool.mypy.overrides]]
 module = "uvloop"
 ignore_missing_imports = true
index 184a9cbb1a3273263c340f4ac38341d1a30883ff..f9911aa16055fe5af8976d8ccd10d9a8b3146ed4 100644 (file)
@@ -1,6 +1,5 @@
 import sys
 import asyncio
-import inspect
 
 import pytest
 
@@ -54,21 +53,6 @@ def pytest_report_header(config):
     return [f"asyncio loop: {loop}"]
 
 
-@pytest.fixture
-def retries(request):
-    """Retry a block in a test a few times before giving up."""
-    import tenacity
-
-    if inspect.iscoroutinefunction(request.function):
-        return tenacity.AsyncRetrying(
-            reraise=True, stop=tenacity.stop_after_attempt(3)
-        )
-    else:
-        return tenacity.Retrying(
-            reraise=True, stop=tenacity.stop_after_attempt(3)
-        )
-
-
 def pytest_sessionstart(session):
     # Configure the async loop.
     loop = session.config.getoption("--loop")
index 6a1a4c0b5ebb31157b633306ef87aafe059c5a3d..a320beb410c91156b8fd86eaf8e943501c2371a0 100644 (file)
@@ -15,7 +15,6 @@ pytest == 6.2.5
 pytest-asyncio == 0.17.0
 pytest-cov == 3.0.0
 pytest-randomly == 3.10.0
-tenacity == 8.0.0
 
 # From the 'dev' extra
 black == 21.12b0
index 74b880f6cc0e5ee602d29bca2af96580ca776264..24d4ca4de79bc2e1fff56fd4d47ae10d0b907c3b 100644 (file)
@@ -285,7 +285,7 @@ def test_no_queue_timeout(deaf_port):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue(dsn, retries):
+def test_queue(dsn):
     def worker(n):
         t0 = time()
         with p.connection() as conn:
@@ -295,23 +295,21 @@ def test_queue(dsn, retries):
         t1 = time()
         results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            with NullConnectionPool(dsn, max_size=2) as p:
-                p.wait()
-                ts = [Thread(target=worker, args=(i,)) for i in range(6)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    results: List[Tuple[int, float, int]] = []
+    with NullConnectionPool(dsn, max_size=2) as p:
+        p.wait()
+        ts = [Thread(target=worker, args=(i,)) for i in range(6)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            times = [item[1] for item in results]
-            want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.2), times
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.2), times
 
-            assert len(set(r[2] for r in results)) == 2, results
+    assert len(set(r[2] for r in results)) == 2, results
 
 
 @pytest.mark.slow
@@ -353,7 +351,7 @@ def test_queue_size(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue_timeout(dsn, retries):
+def test_queue_timeout(dsn):
     def worker(n):
         t0 = time()
         try:
@@ -368,22 +366,20 @@ def test_queue_timeout(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
-                ts = [Thread(target=worker, args=(i,)) for i in range(4)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+        ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            assert len(results) == 2
-            assert len(errors) == 2
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 @pytest.mark.slow
@@ -415,7 +411,7 @@ def test_dead_client(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue_timeout_override(dsn, retries):
+def test_queue_timeout_override(dsn):
     def worker(n):
         t0 = time()
         timeout = 0.25 if n == 3 else None
@@ -431,22 +427,20 @@ def test_queue_timeout_override(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
-                ts = [Thread(target=worker, args=(i,)) for i in range(4)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    with NullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+        ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            assert len(results) == 3
-            assert len(errors) == 1
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 def test_broken_reconnect(dsn):
@@ -854,7 +848,7 @@ def test_stats_measures(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_stats_usage(dsn, retries):
+def test_stats_usage(dsn):
     def worker(n):
         try:
             with p.connection(timeout=0.3) as conn:
@@ -862,33 +856,31 @@ def test_stats_usage(dsn, retries):
         except PoolTimeout:
             pass
 
-    for retry in retries:
-        with retry:
-            with NullConnectionPool(dsn, max_size=3) as p:
-                p.wait(2.0)
-
-                ts = [Thread(target=worker, args=(i,)) for i in range(7)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
-                stats = p.get_stats()
-                assert stats["requests_num"] == 7
-                assert stats["requests_queued"] == 4
-                assert 850 <= stats["requests_wait_ms"] <= 950
-                assert stats["requests_errors"] == 1
-                assert 1150 <= stats["usage_ms"] <= 1250
-                assert stats.get("returns_bad", 0) == 0
-
-                with p.connection() as conn:
-                    conn.close()
-                p.wait()
-                stats = p.pop_stats()
-                assert stats["requests_num"] == 8
-                assert stats["returns_bad"] == 1
-                with p.connection():
-                    pass
-                assert p.get_stats()["requests_num"] == 1
+    with NullConnectionPool(dsn, max_size=3) as p:
+        p.wait(2.0)
+
+        ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        with p.connection() as conn:
+            conn.close()
+        p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
 
 
 @pytest.mark.slow
index fa31984b8915f6ad1ae257363d90683caeb3ea8e..824ed21367bfa2db73104b5a7bb305ca89691786 100644 (file)
@@ -293,7 +293,7 @@ async def test_no_queue_timeout(deaf_port):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue(dsn, retries):
+async def test_queue(dsn):
     async def worker(n):
         t0 = time()
         async with p.connection() as conn:
@@ -304,20 +304,18 @@ async def test_queue(dsn, retries):
         t1 = time()
         results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            async with AsyncNullConnectionPool(dsn, max_size=2) as p:
-                await p.wait()
-                ts = [create_task(worker(i)) for i in range(6)]
-                await asyncio.gather(*ts)
+    results: List[Tuple[int, float, int]] = []
+    async with AsyncNullConnectionPool(dsn, max_size=2) as p:
+        await p.wait()
+        ts = [create_task(worker(i)) for i in range(6)]
+        await asyncio.gather(*ts)
 
-            times = [item[1] for item in results]
-            want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.2), times
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.2), times
 
-            assert len(set(r[2] for r in results)) == 2, results
+    assert len(set(r[2] for r in results)) == 2, results
 
 
 @pytest.mark.slow
@@ -355,7 +353,7 @@ async def test_queue_size(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue_timeout(dsn, retries):
+async def test_queue_timeout(dsn):
     async def worker(n):
         t0 = time()
         try:
@@ -371,21 +369,17 @@ async def test_queue_timeout(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            async with AsyncNullConnectionPool(
-                dsn, max_size=2, timeout=0.1
-            ) as p:
-                ts = [create_task(worker(i)) for i in range(4)]
-                await asyncio.gather(*ts)
+    async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+        ts = [create_task(worker(i)) for i in range(4)]
+        await asyncio.gather(*ts)
 
-            assert len(results) == 2
-            assert len(errors) == 2
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 @pytest.mark.slow
@@ -414,7 +408,7 @@ async def test_dead_client(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue_timeout_override(dsn, retries):
+async def test_queue_timeout_override(dsn):
     async def worker(n):
         t0 = time()
         timeout = 0.25 if n == 3 else None
@@ -431,21 +425,17 @@ async def test_queue_timeout_override(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            async with AsyncNullConnectionPool(
-                dsn, max_size=2, timeout=0.1
-            ) as p:
-                ts = [create_task(worker(i)) for i in range(4)]
-                await asyncio.gather(*ts)
+    async with AsyncNullConnectionPool(dsn, max_size=2, timeout=0.1) as p:
+        ts = [create_task(worker(i)) for i in range(4)]
+        await asyncio.gather(*ts)
 
-            assert len(results) == 3
-            assert len(errors) == 1
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 async def test_broken_reconnect(dsn):
@@ -820,7 +810,7 @@ async def test_stats_measures(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_stats_usage(dsn, retries):
+async def test_stats_usage(dsn):
     async def worker(n):
         try:
             async with p.connection(timeout=0.3) as conn:
@@ -828,30 +818,28 @@ async def test_stats_usage(dsn, retries):
         except PoolTimeout:
             pass
 
-    async for retry in retries:
-        with retry:
-            async with AsyncNullConnectionPool(dsn, max_size=3) as p:
-                await p.wait(2.0)
-
-                ts = [create_task(worker(i)) for i in range(7)]
-                await asyncio.gather(*ts)
-                stats = p.get_stats()
-                assert stats["requests_num"] == 7
-                assert stats["requests_queued"] == 4
-                assert 850 <= stats["requests_wait_ms"] <= 950
-                assert stats["requests_errors"] == 1
-                assert 1150 <= stats["usage_ms"] <= 1250
-                assert stats.get("returns_bad", 0) == 0
-
-                async with p.connection() as conn:
-                    await conn.close()
-                await p.wait()
-                stats = p.pop_stats()
-                assert stats["requests_num"] == 8
-                assert stats["returns_bad"] == 1
-                async with p.connection():
-                    pass
-                assert p.get_stats()["requests_num"] == 1
+    async with AsyncNullConnectionPool(dsn, max_size=3) as p:
+        await p.wait(2.0)
+
+        ts = [create_task(worker(i)) for i in range(7)]
+        await asyncio.gather(*ts)
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        async with p.connection() as conn:
+            await conn.close()
+        await p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        async with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
 
 
 @pytest.mark.slow
index a7634b459779597a34060634767a85d5625f79e3..26525d0c53ff2d269dd9165a7d80ab243df4c15d 100644 (file)
@@ -101,7 +101,7 @@ def test_connection_not_lost(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_concurrent_filling(dsn, monkeypatch, retries):
+def test_concurrent_filling(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.1)
 
     def add_time(self, conn):
@@ -111,17 +111,15 @@ def test_concurrent_filling(dsn, monkeypatch, retries):
     add_orig = pool.ConnectionPool._add_to_pool
     monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
 
-    for retry in retries:
-        with retry:
-            times: List[float] = []
-            t0 = time()
+    times: List[float] = []
+    t0 = time()
 
-            with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
-                p.wait(1.0)
-                want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
-                assert len(times) == len(want_times)
-                for got, want in zip(times, want_times):
-                    assert got == pytest.approx(want, 0.1), times
+    with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
+        p.wait(1.0)
+        want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+        assert len(times) == len(want_times)
+        for got, want in zip(times, want_times):
+            assert got == pytest.approx(want, 0.1), times
 
 
 @pytest.mark.slow
@@ -296,7 +294,7 @@ def test_reset_broken(dsn, caplog):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue(dsn, retries):
+def test_queue(dsn):
     def worker(n):
         t0 = time()
         with p.connection() as conn:
@@ -306,23 +304,21 @@ def test_queue(dsn, retries):
         t1 = time()
         results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            with pool.ConnectionPool(dsn, min_size=2) as p:
-                p.wait()
-                ts = [Thread(target=worker, args=(i,)) for i in range(6)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    results: List[Tuple[int, float, int]] = []
+    with pool.ConnectionPool(dsn, min_size=2) as p:
+        p.wait()
+        ts = [Thread(target=worker, args=(i,)) for i in range(6)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            times = [item[1] for item in results]
-            want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.1), times
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.1), times
 
-            assert len(set(r[2] for r in results)) == 2, results
+    assert len(set(r[2] for r in results)) == 2, results
 
 
 @pytest.mark.slow
@@ -364,7 +360,7 @@ def test_queue_size(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue_timeout(dsn, retries):
+def test_queue_timeout(dsn):
     def worker(n):
         t0 = time()
         try:
@@ -379,22 +375,20 @@ def test_queue_timeout(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-                ts = [Thread(target=worker, args=(i,)) for i in range(4)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+        ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            assert len(results) == 2
-            assert len(errors) == 2
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 @pytest.mark.slow
@@ -427,7 +421,7 @@ def test_dead_client(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_queue_timeout_override(dsn, retries):
+def test_queue_timeout_override(dsn):
     def worker(n):
         t0 = time()
         timeout = 0.25 if n == 3 else None
@@ -443,22 +437,20 @@ def test_queue_timeout_override(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
-                ts = [Thread(target=worker, args=(i,)) for i in range(4)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+    with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+        ts = [Thread(target=worker, args=(i,)) for i in range(4)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            assert len(results) == 3
-            assert len(errors) == 1
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 def test_broken_reconnect(dsn):
@@ -794,7 +786,7 @@ def test_reopen(dsn):
         (0, [0.35, 0.45, 0.55, 0.60, 0.65, 0.70, 0.80, 0.85]),
     ],
 )
-def test_grow(dsn, monkeypatch, retries, min_size, want_times):
+def test_grow(dsn, monkeypatch, min_size, want_times):
     delay_connection(monkeypatch, 0.1)
 
     def worker(n):
@@ -804,26 +796,21 @@ def test_grow(dsn, monkeypatch, retries, min_size, want_times):
         t1 = time()
         results.append((n, t1 - t0))
 
-    for retry in retries:
-        with retry:
-            with pool.ConnectionPool(
-                dsn, min_size=min_size, max_size=4, num_workers=3
-            ) as p:
-                p.wait(1.0)
-                results: List[Tuple[int, float]] = []
+    with pool.ConnectionPool(
+        dsn, min_size=min_size, max_size=4, num_workers=3
+    ) as p:
+        p.wait(1.0)
+        results: List[Tuple[int, float]] = []
 
-                ts = [
-                    Thread(target=worker, args=(i,))
-                    for i in range(len(want_times))
-                ]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
+        ts = [Thread(target=worker, args=(i,)) for i in range(len(want_times))]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
 
-            times = [item[1] for item in results]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.1), times
+    times = [item[1] for item in results]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.1), times
 
 
 @pytest.mark.slow
@@ -862,7 +849,7 @@ def test_shrink(dsn, monkeypatch):
 
 
 @pytest.mark.slow
-def test_reconnect(proxy, caplog, monkeypatch, retries):
+def test_reconnect(proxy, caplog, monkeypatch):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
 
     assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
@@ -870,36 +857,32 @@ def test_reconnect(proxy, caplog, monkeypatch, retries):
     monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
     monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
 
-    for retry in retries:
-        with retry:
-            caplog.clear()
-            proxy.start()
-            with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
-                p.wait(2.0)
-                proxy.stop()
-
-                with pytest.raises(psycopg.OperationalError):
-                    with p.connection() as conn:
-                        conn.execute("select 1")
-
-                sleep(1.0)
-                proxy.start()
-                p.wait()
-
-                with p.connection() as conn:
-                    conn.execute("select 1")
-
-            assert "BAD" in caplog.messages[0]
-            times = [rec.created for rec in caplog.records]
-            assert times[1] - times[0] < 0.05
-            deltas = [
-                times[i + 1] - times[i] for i in range(1, len(times) - 1)
-            ]
-            assert len(deltas) == 3
-            want = 0.1
-            for delta in deltas:
-                assert delta == pytest.approx(want, 0.05), deltas
-                want *= 2
+    caplog.clear()
+    proxy.start()
+    with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
+        p.wait(2.0)
+        proxy.stop()
+
+        with pytest.raises(psycopg.OperationalError):
+            with p.connection() as conn:
+                conn.execute("select 1")
+
+        sleep(1.0)
+        proxy.start()
+        p.wait()
+
+        with p.connection() as conn:
+            conn.execute("select 1")
+
+    assert "BAD" in caplog.messages[0]
+    times = [rec.created for rec in caplog.records]
+    assert times[1] - times[0] < 0.05
+    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+    assert len(deltas) == 3
+    want = 0.1
+    for delta in deltas:
+        assert delta == pytest.approx(want, 0.05), deltas
+        want *= 2
 
 
 @pytest.mark.slow
@@ -943,18 +926,16 @@ def test_reconnect_failure(proxy):
 
 
 @pytest.mark.slow
-def test_uniform_use(dsn, retries):
-    for retry in retries:
-        with retry:
-            with pool.ConnectionPool(dsn, min_size=4) as p:
-                counts = Counter[int]()
-                for i in range(8):
-                    with p.connection() as conn:
-                        sleep(0.1)
-                        counts[id(conn)] += 1
+def test_uniform_use(dsn):
+    with pool.ConnectionPool(dsn, min_size=4) as p:
+        counts = Counter[int]()
+        for i in range(8):
+            with p.connection() as conn:
+                sleep(0.1)
+                counts[id(conn)] += 1
 
-            assert len(counts) == 4
-            assert set(counts.values()) == set([2])
+    assert len(counts) == 4
+    assert set(counts.values()) == set([2])
 
 
 @pytest.mark.slow
@@ -1101,7 +1082,7 @@ def test_stats_measures(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-def test_stats_usage(dsn, retries):
+def test_stats_usage(dsn):
     def worker(n):
         try:
             with p.connection(timeout=0.3) as conn:
@@ -1109,33 +1090,31 @@ def test_stats_usage(dsn, retries):
         except pool.PoolTimeout:
             pass
 
-    for retry in retries:
-        with retry:
-            with pool.ConnectionPool(dsn, min_size=3) as p:
-                p.wait(2.0)
-
-                ts = [Thread(target=worker, args=(i,)) for i in range(7)]
-                for t in ts:
-                    t.start()
-                for t in ts:
-                    t.join()
-                stats = p.get_stats()
-                assert stats["requests_num"] == 7
-                assert stats["requests_queued"] == 4
-                assert 850 <= stats["requests_wait_ms"] <= 950
-                assert stats["requests_errors"] == 1
-                assert 1150 <= stats["usage_ms"] <= 1250
-                assert stats.get("returns_bad", 0) == 0
-
-                with p.connection() as conn:
-                    conn.close()
-                p.wait()
-                stats = p.pop_stats()
-                assert stats["requests_num"] == 8
-                assert stats["returns_bad"] == 1
-                with p.connection():
-                    pass
-                assert p.get_stats()["requests_num"] == 1
+    with pool.ConnectionPool(dsn, min_size=3) as p:
+        p.wait(2.0)
+
+        ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+        for t in ts:
+            t.start()
+        for t in ts:
+            t.join()
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        with p.connection() as conn:
+            conn.close()
+        p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
 
 
 @pytest.mark.slow
index eb84868752c969ab0451705bf7702054c6323560..7b663bea6f3ace2d5be7a0e9aac8dc9b5c92f9a1 100644 (file)
@@ -96,7 +96,7 @@ async def test_connection_not_lost(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_concurrent_filling(dsn, monkeypatch, retries):
+async def test_concurrent_filling(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.1)
 
     async def add_time(self, conn):
@@ -106,19 +106,15 @@ async def test_concurrent_filling(dsn, monkeypatch, retries):
     add_orig = pool.AsyncConnectionPool._add_to_pool
     monkeypatch.setattr(pool.AsyncConnectionPool, "_add_to_pool", add_time)
 
-    async for retry in retries:
-        with retry:
-            times: List[float] = []
-            t0 = time()
+    times: List[float] = []
+    t0 = time()
 
-            async with pool.AsyncConnectionPool(
-                dsn, min_size=5, num_workers=2
-            ) as p:
-                await p.wait(1.0)
-                want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
-                assert len(times) == len(want_times)
-                for got, want in zip(times, want_times):
-                    assert got == pytest.approx(want, 0.1), times
+    async with pool.AsyncConnectionPool(dsn, min_size=5, num_workers=2) as p:
+        await p.wait(1.0)
+        want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+        assert len(times) == len(want_times)
+        for got, want in zip(times, want_times):
+            assert got == pytest.approx(want, 0.1), times
 
 
 @pytest.mark.slow
@@ -303,7 +299,7 @@ async def test_reset_broken(dsn, caplog):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue(dsn, retries):
+async def test_queue(dsn):
     async def worker(n):
         t0 = time()
         async with p.connection() as conn:
@@ -314,20 +310,18 @@ async def test_queue(dsn, retries):
         t1 = time()
         results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
-                await p.wait()
-                ts = [create_task(worker(i)) for i in range(6)]
-                await asyncio.gather(*ts)
+    results: List[Tuple[int, float, int]] = []
+    async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
+        await p.wait()
+        ts = [create_task(worker(i)) for i in range(6)]
+        await asyncio.gather(*ts)
 
-            times = [item[1] for item in results]
-            want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.1), times
+    times = [item[1] for item in results]
+    want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.1), times
 
-            assert len(set(r[2] for r in results)) == 2, results
+    assert len(set(r[2] for r in results)) == 2, results
 
 
 @pytest.mark.slow
@@ -365,7 +359,7 @@ async def test_queue_size(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue_timeout(dsn, retries):
+async def test_queue_timeout(dsn):
     async def worker(n):
         t0 = time()
         try:
@@ -381,21 +375,17 @@ async def test_queue_timeout(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            async with pool.AsyncConnectionPool(
-                dsn, min_size=2, timeout=0.1
-            ) as p:
-                ts = [create_task(worker(i)) for i in range(4)]
-                await asyncio.gather(*ts)
+    async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+        ts = [create_task(worker(i)) for i in range(4)]
+        await asyncio.gather(*ts)
 
-            assert len(results) == 2
-            assert len(errors) == 2
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 @pytest.mark.slow
@@ -425,7 +415,7 @@ async def test_dead_client(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_queue_timeout_override(dsn, retries):
+async def test_queue_timeout_override(dsn):
     async def worker(n):
         t0 = time()
         timeout = 0.25 if n == 3 else None
@@ -442,21 +432,17 @@ async def test_queue_timeout_override(dsn, retries):
             t1 = time()
             results.append((n, t1 - t0, pid))
 
-    async for retry in retries:
-        with retry:
-            results: List[Tuple[int, float, int]] = []
-            errors: List[Tuple[int, float, Exception]] = []
+    results: List[Tuple[int, float, int]] = []
+    errors: List[Tuple[int, float, Exception]] = []
 
-            async with pool.AsyncConnectionPool(
-                dsn, min_size=2, timeout=0.1
-            ) as p:
-                ts = [create_task(worker(i)) for i in range(4)]
-                await asyncio.gather(*ts)
+    async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
+        ts = [create_task(worker(i)) for i in range(4)]
+        await asyncio.gather(*ts)
 
-            assert len(results) == 3
-            assert len(errors) == 1
-            for e in errors:
-                assert 0.1 < e[1] < 0.15
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
 
 
 async def test_broken_reconnect(dsn):
@@ -769,7 +755,7 @@ async def test_reopen(dsn):
         (0, [0.35, 0.45, 0.55, 0.60, 0.65, 0.70, 0.80, 0.85]),
     ],
 )
-async def test_grow(dsn, monkeypatch, retries, min_size, want_times):
+async def test_grow(dsn, monkeypatch, min_size, want_times):
     delay_connection(monkeypatch, 0.1)
 
     async def worker(n):
@@ -779,21 +765,19 @@ async def test_grow(dsn, monkeypatch, retries, min_size, want_times):
         t1 = time()
         results.append((n, t1 - t0))
 
-    async for retry in retries:
-        with retry:
-            async with pool.AsyncConnectionPool(
-                dsn, min_size=min_size, max_size=4, num_workers=3
-            ) as p:
-                await p.wait(1.0)
-                ts = []
-                results: List[Tuple[int, float]] = []
+    async with pool.AsyncConnectionPool(
+        dsn, min_size=min_size, max_size=4, num_workers=3
+    ) as p:
+        await p.wait(1.0)
+        ts = []
+        results: List[Tuple[int, float]] = []
 
-                ts = [create_task(worker(i)) for i in range(len(want_times))]
-                await asyncio.gather(*ts)
+        ts = [create_task(worker(i)) for i in range(len(want_times))]
+        await asyncio.gather(*ts)
 
-            times = [item[1] for item in results]
-            for got, want in zip(times, want_times):
-                assert got == pytest.approx(want, 0.1), times
+    times = [item[1] for item in results]
+    for got, want in zip(times, want_times):
+        assert got == pytest.approx(want, 0.1), times
 
 
 @pytest.mark.slow
@@ -832,7 +816,7 @@ async def test_shrink(dsn, monkeypatch):
 
 
 @pytest.mark.slow
-async def test_reconnect(proxy, caplog, monkeypatch, retries):
+async def test_reconnect(proxy, caplog, monkeypatch):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
 
     assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
@@ -840,38 +824,32 @@ async def test_reconnect(proxy, caplog, monkeypatch, retries):
     monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
     monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
 
-    async for retry in retries:
-        with retry:
-            caplog.clear()
-            proxy.start()
-            async with pool.AsyncConnectionPool(
-                proxy.client_dsn, min_size=1
-            ) as p:
-                await p.wait(2.0)
-                proxy.stop()
-
-                with pytest.raises(psycopg.OperationalError):
-                    async with p.connection() as conn:
-                        await conn.execute("select 1")
-
-                await asyncio.sleep(1.0)
-                proxy.start()
-                await p.wait()
-
-                async with p.connection() as conn:
-                    await conn.execute("select 1")
-
-            assert "BAD" in caplog.messages[0]
-            times = [rec.created for rec in caplog.records]
-            assert times[1] - times[0] < 0.05
-            deltas = [
-                times[i + 1] - times[i] for i in range(1, len(times) - 1)
-            ]
-            assert len(deltas) == 3
-            want = 0.1
-            for delta in deltas:
-                assert delta == pytest.approx(want, 0.05), deltas
-                want *= 2
+    caplog.clear()
+    proxy.start()
+    async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=1) as p:
+        await p.wait(2.0)
+        proxy.stop()
+
+        with pytest.raises(psycopg.OperationalError):
+            async with p.connection() as conn:
+                await conn.execute("select 1")
+
+        await asyncio.sleep(1.0)
+        proxy.start()
+        await p.wait()
+
+        async with p.connection() as conn:
+            await conn.execute("select 1")
+
+    assert "BAD" in caplog.messages[0]
+    times = [rec.created for rec in caplog.records]
+    assert times[1] - times[0] < 0.05
+    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+    assert len(deltas) == 3
+    want = 0.1
+    for delta in deltas:
+        assert delta == pytest.approx(want, 0.05), deltas
+        want *= 2
 
 
 @pytest.mark.slow
@@ -915,18 +893,16 @@ async def test_reconnect_failure(proxy):
 
 
 @pytest.mark.slow
-async def test_uniform_use(dsn, retries):
-    async for retry in retries:
-        with retry:
-            async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
-                counts = Counter[int]()
-                for i in range(8):
-                    async with p.connection() as conn:
-                        await asyncio.sleep(0.1)
-                        counts[id(conn)] += 1
+async def test_uniform_use(dsn):
+    async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
+        counts = Counter[int]()
+        for i in range(8):
+            async with p.connection() as conn:
+                await asyncio.sleep(0.1)
+                counts[id(conn)] += 1
 
-            assert len(counts) == 4
-            assert set(counts.values()) == set([2])
+    assert len(counts) == 4
+    assert set(counts.values()) == set([2])
 
 
 @pytest.mark.slow
@@ -1070,7 +1046,7 @@ async def test_stats_measures(dsn):
 
 @pytest.mark.slow
 @pytest.mark.timing
-async def test_stats_usage(dsn, retries):
+async def test_stats_usage(dsn):
     async def worker(n):
         try:
             async with p.connection(timeout=0.3) as conn:
@@ -1078,30 +1054,28 @@ async def test_stats_usage(dsn, retries):
         except pool.PoolTimeout:
             pass
 
-    async for retry in retries:
-        with retry:
-            async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
-                await p.wait(2.0)
-
-                ts = [create_task(worker(i)) for i in range(7)]
-                await asyncio.gather(*ts)
-                stats = p.get_stats()
-                assert stats["requests_num"] == 7
-                assert stats["requests_queued"] == 4
-                assert 850 <= stats["requests_wait_ms"] <= 950
-                assert stats["requests_errors"] == 1
-                assert 1150 <= stats["usage_ms"] <= 1250
-                assert stats.get("returns_bad", 0) == 0
-
-                async with p.connection() as conn:
-                    await conn.close()
-                await p.wait()
-                stats = p.pop_stats()
-                assert stats["requests_num"] == 8
-                assert stats["returns_bad"] == 1
-                async with p.connection():
-                    pass
-                assert p.get_stats()["requests_num"] == 1
+    async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
+        await p.wait(2.0)
+
+        ts = [create_task(worker(i)) for i in range(7)]
+        await asyncio.gather(*ts)
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_errors"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        async with p.connection() as conn:
+            await conn.close()
+        await p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        async with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
 
 
 @pytest.mark.slow
index 3296faa17f8663e5bc0b60ba8d47e2f13f3972b5..a4b0d4e746236e916043835bd0796811ee1159c0 100644 (file)
@@ -15,7 +15,7 @@ import psycopg
 
 
 @pytest.mark.slow
-def test_concurrent_execution(dsn, retries):
+def test_concurrent_execution(dsn):
     def worker():
         cnn = psycopg.connect(dsn)
         cur = cnn.cursor()
@@ -23,16 +23,14 @@ def test_concurrent_execution(dsn, retries):
         cur.close()
         cnn.close()
 
-    for retry in retries:
-        with retry:
-            t1 = threading.Thread(target=worker)
-            t2 = threading.Thread(target=worker)
-            t0 = time.time()
-            t1.start()
-            t2.start()
-            t1.join()
-            t2.join()
-            assert time.time() - t0 < 0.8, "something broken in concurrency"
+    t1 = threading.Thread(target=worker)
+    t2 = threading.Thread(target=worker)
+    t0 = time.time()
+    t1.start()
+    t2.start()
+    t1.join()
+    t2.join()
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
 
 
 @pytest.mark.slow
@@ -152,7 +150,7 @@ def test_notifies(conn, dsn):
 
 
 @pytest.mark.slow
-def test_cancel(conn, retries):
+def test_cancel(conn):
     def canceller():
         try:
             time.sleep(0.5)
@@ -160,52 +158,48 @@ def test_cancel(conn, retries):
         except Exception as exc:
             errors.append(exc)
 
-    for retry in retries:
-        with retry:
-            errors: List[Exception] = []
+    errors: List[Exception] = []
 
-            cur = conn.cursor()
-            t = threading.Thread(target=canceller)
-            t0 = time.time()
-            t.start()
+    cur = conn.cursor()
+    t = threading.Thread(target=canceller)
+    t0 = time.time()
+    t.start()
 
-            with pytest.raises(psycopg.DatabaseError):
-                cur.execute("select pg_sleep(2)")
+    with pytest.raises(psycopg.DatabaseError):
+        cur.execute("select pg_sleep(2)")
 
-            t1 = time.time()
-            assert not errors
-            assert 0.0 < t1 - t0 < 1.0
+    t1 = time.time()
+    assert not errors
+    assert 0.0 < t1 - t0 < 1.0
 
-            # still working
-            conn.rollback()
-            assert cur.execute("select 1").fetchone()[0] == 1
+    # still working
+    conn.rollback()
+    assert cur.execute("select 1").fetchone()[0] == 1
 
-            t.join()
+    t.join()
 
 
 @pytest.mark.slow
-def test_identify_closure(dsn, retries):
+def test_identify_closure(dsn):
     def closer():
         time.sleep(0.2)
         conn2.execute(
             "select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
         )
 
-    for retry in retries:
-        with retry:
-            conn = psycopg.connect(dsn)
-            conn2 = psycopg.connect(dsn)
-            try:
-                t = threading.Thread(target=closer)
-                t.start()
-                t0 = time.time()
-                try:
-                    with pytest.raises(psycopg.OperationalError):
-                        conn.execute("select pg_sleep(1.0)")
-                    t1 = time.time()
-                    assert 0.2 < t1 - t0 < 0.4
-                finally:
-                    t.join()
-            finally:
-                conn.close()
-                conn2.close()
+    conn = psycopg.connect(dsn)
+    conn2 = psycopg.connect(dsn)
+    try:
+        t = threading.Thread(target=closer)
+        t.start()
+        t0 = time.time()
+        try:
+            with pytest.raises(psycopg.OperationalError):
+                conn.execute("select pg_sleep(1.0)")
+            t1 = time.time()
+            assert 0.2 < t1 - t0 < 0.4
+        finally:
+            t.join()
+    finally:
+        conn.close()
+        conn2.close()
index 995a3acd50743a9cb9791fdc3cd49636c5747e29..a2710f1d73d5114f0938d63213defebb501d1835 100644 (file)
@@ -42,7 +42,7 @@ async def test_commit_concurrency(aconn):
 
 
 @pytest.mark.slow
-async def test_concurrent_execution(dsn, retries):
+async def test_concurrent_execution(dsn):
     async def worker():
         cnn = await psycopg.AsyncConnection.connect(dsn)
         cur = cnn.cursor()
@@ -50,12 +50,10 @@ async def test_concurrent_execution(dsn, retries):
         await cur.close()
         await cnn.close()
 
-    async for retry in retries:
-        with retry:
-            workers = [worker(), worker()]
-            t0 = time.time()
-            await asyncio.gather(*workers)
-            assert time.time() - t0 < 0.8, "something broken in concurrency"
+    workers = [worker(), worker()]
+    t0 = time.time()
+    await asyncio.gather(*workers)
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
 
 
 @pytest.mark.slow
@@ -102,7 +100,7 @@ async def test_notifies(aconn, dsn):
 
 
 @pytest.mark.slow
-async def test_cancel(aconn, retries):
+async def test_cancel(aconn):
     async def canceller():
         try:
             await asyncio.sleep(0.5)
@@ -115,47 +113,43 @@ async def test_cancel(aconn, retries):
         with pytest.raises(psycopg.DatabaseError):
             await cur.execute("select pg_sleep(2)")
 
-    async for retry in retries:
-        with retry:
-            errors: List[Exception] = []
-            workers = [worker(), canceller()]
+    errors: List[Exception] = []
+    workers = [worker(), canceller()]
 
-            t0 = time.time()
-            await asyncio.gather(*workers)
+    t0 = time.time()
+    await asyncio.gather(*workers)
 
-            t1 = time.time()
-            assert not errors
-            assert 0.0 < t1 - t0 < 1.0
+    t1 = time.time()
+    assert not errors
+    assert 0.0 < t1 - t0 < 1.0
 
-            # still working
-            await aconn.rollback()
-            cur = aconn.cursor()
-            await cur.execute("select 1")
-            assert await cur.fetchone() == (1,)
+    # still working
+    await aconn.rollback()
+    cur = aconn.cursor()
+    await cur.execute("select 1")
+    assert await cur.fetchone() == (1,)
 
 
 @pytest.mark.slow
-async def test_identify_closure(dsn, retries):
+async def test_identify_closure(dsn):
     async def closer():
         await asyncio.sleep(0.2)
         await conn2.execute(
             "select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
         )
 
-    async for retry in retries:
-        with retry:
-            aconn = await psycopg.AsyncConnection.connect(dsn)
-            conn2 = await psycopg.AsyncConnection.connect(dsn)
-            try:
-                t = create_task(closer())
-                t0 = time.time()
-                try:
-                    with pytest.raises(psycopg.OperationalError):
-                        await aconn.execute("select pg_sleep(1.0)")
-                    t1 = time.time()
-                    assert 0.2 < t1 - t0 < 0.4
-                finally:
-                    await asyncio.gather(t)
-            finally:
-                await aconn.close()
-                await conn2.close()
+    aconn = await psycopg.AsyncConnection.connect(dsn)
+    conn2 = await psycopg.AsyncConnection.connect(dsn)
+    try:
+        t = create_task(closer())
+        t0 = time.time()
+        try:
+            with pytest.raises(psycopg.OperationalError):
+                await aconn.execute("select pg_sleep(1.0)")
+            t1 = time.time()
+            assert 0.2 < t1 - t0 < 0.4
+        finally:
+            await asyncio.gather(t)
+    finally:
+        await aconn.close()
+        await conn2.close()
index 72b0abfbf14e4ff7845639499aa6ab2609f39f36..285b598b654ded26449a2be07c8117915093f557 100644 (file)
@@ -581,7 +581,7 @@ def test_worker_life(conn, format, buffer):
     [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
 )
 @pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
+def test_copy_to_leaks(dsn, faker, fmt, set_types, method):
     faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
@@ -622,17 +622,15 @@ def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
                         list(copy.rows())
 
     gc_collect()
-    for retry in retries:
-        with retry:
-            n = []
-            for i in range(3):
-                work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
+    n = []
+    for i in range(3):
+        work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
 
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 @pytest.mark.slow
@@ -640,7 +638,7 @@ def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
     "fmt, set_types",
     [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
 )
-def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
+def test_copy_from_leaks(dsn, faker, fmt, set_types):
     faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
@@ -669,17 +667,15 @@ def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
                     faker.assert_record(got, want)
 
     gc_collect()
-    for retry in retries:
-        with retry:
-            n = []
-            for i in range(3):
-                work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
-
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    n = []
+    for i in range(3):
+        work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
+
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 def py_to_raw(item, fmt):
index 717da3fa695e52c16c1855efc8427dcc021d806e..4b78ce59147c724b2399c6c4db8b1422d4e4cf49 100644 (file)
@@ -574,7 +574,7 @@ async def test_worker_life(aconn, format, buffer):
     [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
 )
 @pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-async def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
+async def test_copy_to_leaks(dsn, faker, fmt, set_types, method):
     faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
@@ -615,17 +615,15 @@ async def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
                         await alist(copy.rows())
 
     gc_collect()
-    async for retry in retries:
-        with retry:
-            n = []
-            for i in range(3):
-                await work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
+    n = []
+    for i in range(3):
+        await work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
 
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 @pytest.mark.slow
@@ -633,7 +631,7 @@ async def test_copy_to_leaks(dsn, faker, fmt, set_types, method, retries):
     "fmt, set_types",
     [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
 )
-async def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
+async def test_copy_from_leaks(dsn, faker, fmt, set_types):
     faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
@@ -662,17 +660,15 @@ async def test_copy_from_leaks(dsn, faker, fmt, set_types, retries):
                     faker.assert_record(got, want)
 
     gc_collect()
-    async for retry in retries:
-        with retry:
-            n = []
-            for i in range(3):
-                await work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
-
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    n = []
+    for i in range(3):
+        await work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
+
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 async def ensure_table(cur, tabledef, name="copy_in"):
index fae1d70cc1727976af9e7f1e5b714256f96dfb5a..e202fe6d3b8b1139296d7c59fa266b58a45b796d 100644 (file)
@@ -759,7 +759,7 @@ def test_str(conn):
 @pytest.mark.parametrize(
     "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
 )
-def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
+def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
     faker.format = fmt
     faker.choose_schema(ncols=5)
     faker.make_records(10)
@@ -791,17 +791,15 @@ def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
                     for rec in cur:
                         pass
 
-    for retry in retries:
-        with retry:
-            n = []
-            gc_collect()
-            for i in range(3):
-                work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    n = []
+    gc_collect()
+    for i in range(3):
+        work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 def my_row_factory(
index fc526464ac9b41ec2691d59658e8a31e14552fc8..c55130298b72598d38b5900452c3678858802468 100644 (file)
@@ -640,7 +640,7 @@ async def test_str(aconn):
 @pytest.mark.parametrize(
     "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
 )
-async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
+async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
     faker.format = fmt
     faker.choose_schema(ncols=5)
     faker.make_records(10)
@@ -673,15 +673,13 @@ async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
                     async for rec in cur:
                         pass
 
-    async for retry in retries:
-        with retry:
-            n = []
-            gc_collect()
-            for i in range(3):
-                await work()
-                gc_collect()
-                n.append(len(gc.get_objects()))
-
-            assert (
-                n[0] == n[1] == n[2]
-            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    n = []
+    gc_collect()
+    for i in range(3):
+        await work()
+        gc_collect()
+        n.append(len(gc.get_objects()))
+
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
index 383a9cbf60ddcfb740dd42977a9cc5dda132fa0a..7d0856f2887844021a72980688b9d85777dd8559 100644 (file)
@@ -46,34 +46,28 @@ samples_ok = [
 
 
 @pytest.mark.parametrize("conninfo, want, env", samples_ok)
-def test_srv(conninfo, want, env, fake_srv, retries, monkeypatch):
+def test_srv(conninfo, want, env, fake_srv, monkeypatch):
     if env:
         for k, v in env.items():
             monkeypatch.setenv(k, v)
-    # retries are needed because weight order is random, although wrong order
-    # is unlikely.
-    for retry in retries:
-        with retry:
-            params = conninfo_to_dict(conninfo)
-            params = psycopg._dns.resolve_srv(params)  # type: ignore[attr-defined]
-            assert conninfo_to_dict(want) == params
+    # Note: This test is flakey because weight order is random, although wrong
+    # order is unlikely.
+    params = conninfo_to_dict(conninfo)
+    params = psycopg._dns.resolve_srv(params)  # type: ignore[attr-defined]
+    assert conninfo_to_dict(want) == params
 
 
 @pytest.mark.asyncio
 @pytest.mark.parametrize("conninfo, want, env", samples_ok)
-async def test_srv_async(conninfo, want, env, afake_srv, retries, monkeypatch):
+async def test_srv_async(conninfo, want, env, afake_srv, monkeypatch):
     if env:
         for k, v in env.items():
             monkeypatch.setenv(k, v)
-    async for retry in retries:
-        with retry:
-            params = conninfo_to_dict(conninfo)
-            params = await (
-                psycopg._dns.resolve_srv_async(  # type: ignore[attr-defined]
-                    params
-                )
-            )
-            assert conninfo_to_dict(want) == params
+    params = conninfo_to_dict(conninfo)
+    params = await (
+        psycopg._dns.resolve_srv_async(params)  # type: ignore[attr-defined]
+    )
+    assert conninfo_to_dict(want) == params
 
 
 samples_bad = [
index 664b0ff59e1a263b1d547b34eeaa883870b86127..9182947dfd060ae00e32c2fcf635801cbdde7dc7 100644 (file)
@@ -103,23 +103,21 @@ def test_binary_cursor_text_override(conn):
     cur.close()
 
 
-def test_close(conn, recwarn, retries):
-    for retry in retries:
-        with retry:
-            if conn.info.transaction_status == conn.TransactionStatus.INTRANS:
-                # connection dirty from previous failure
-                conn.execute("close foo")
-            recwarn.clear()
-            cur = conn.cursor("foo")
-            cur.execute("select generate_series(1, 10) as bar")
-            cur.close()
-            assert cur.closed
-
-            assert not conn.execute(
-                "select * from pg_cursors where name = 'foo'"
-            ).fetchone()
-            del cur
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+def test_close(conn, recwarn):
+    if conn.info.transaction_status == conn.TransactionStatus.INTRANS:
+        # connection dirty from previous failure
+        conn.execute("close foo")
+    recwarn.clear()
+    cur = conn.cursor("foo")
+    cur.execute("select generate_series(1, 10) as bar")
+    cur.close()
+    assert cur.closed
+
+    assert not conn.execute(
+        "select * from pg_cursors where name = 'foo'"
+    ).fetchone()
+    del cur
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 def test_close_idempotent(conn):
@@ -183,13 +181,11 @@ def test_cursor_close_fetchall(conn):
         cur.fetchall()
 
 
-def test_close_noop(conn, recwarn, retries):
-    for retry in retries:
-        with retry:
-            recwarn.clear()
-            cur = conn.cursor("foo")
-            cur.close()
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+def test_close_noop(conn, recwarn):
+    recwarn.clear()
+    cur = conn.cursor("foo")
+    cur.close()
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 def test_close_on_error(conn):
@@ -209,19 +205,17 @@ def test_pgresult(conn):
     assert not cur.pgresult
 
 
-def test_context(conn, recwarn, retries):
-    for retry in retries:
-        with retry:
-            recwarn.clear()
-            with conn.cursor("foo") as cur:
-                cur.execute("select generate_series(1, 10) as bar")
+def test_context(conn, recwarn):
+    recwarn.clear()
+    with conn.cursor("foo") as cur:
+        cur.execute("select generate_series(1, 10) as bar")
 
-            assert cur.closed
-            assert not conn.execute(
-                "select * from pg_cursors where name = 'foo'"
-            ).fetchone()
-            del cur
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+    assert cur.closed
+    assert not conn.execute(
+        "select * from pg_cursors where name = 'foo'"
+    ).fetchone()
+    del cur
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 def test_close_no_clobber(conn):
@@ -230,14 +224,12 @@ def test_close_no_clobber(conn):
             cur.execute("select 1 / %s", (0,))
 
 
-def test_warn_close(conn, recwarn, retries):
-    for retry in retries:
-        with retry:
-            recwarn.clear()
-            cur = conn.cursor("foo")
-            cur.execute("select generate_series(1, 10) as bar")
-            del cur
-            assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+def test_warn_close(conn, recwarn):
+    recwarn.clear()
+    cur = conn.cursor("foo")
+    cur.execute("select generate_series(1, 10) as bar")
+    del cur
+    assert ".close()" in str(recwarn.pop(ResourceWarning).message)
 
 
 def test_execute_reuse(conn):
index 29ac8a56283f35ee157194bed33bc99eda94ea19..730a39887626acb768d2327e299798bbf537fa5a 100644 (file)
@@ -105,28 +105,21 @@ async def test_binary_cursor_text_override(aconn):
     await cur.close()
 
 
-async def test_close(aconn, recwarn, retries):
-    async for retry in retries:
-        with retry:
-            if (
-                aconn.info.transaction_status
-                == aconn.TransactionStatus.INTRANS
-            ):
-                # connection dirty from previous failure
-                await aconn.execute("close foo")
-            recwarn.clear()
-            cur = aconn.cursor("foo")
-            await cur.execute("select generate_series(1, 10) as bar")
-            await cur.close()
-            assert cur.closed
-
-            assert not await (
-                await aconn.execute(
-                    "select * from pg_cursors where name = 'foo'"
-                )
-            ).fetchone()
-            del cur
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+async def test_close(aconn, recwarn):
+    if aconn.info.transaction_status == aconn.TransactionStatus.INTRANS:
+        # connection dirty from previous failure
+        await aconn.execute("close foo")
+    recwarn.clear()
+    cur = aconn.cursor("foo")
+    await cur.execute("select generate_series(1, 10) as bar")
+    await cur.close()
+    assert cur.closed
+
+    assert not await (
+        await aconn.execute("select * from pg_cursors where name = 'foo'")
+    ).fetchone()
+    del cur
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 async def test_close_idempotent(aconn):
@@ -190,13 +183,11 @@ async def test_cursor_close_fetchall(aconn):
         await cur.fetchall()
 
 
-async def test_close_noop(aconn, recwarn, retries):
-    async for retry in retries:
-        with retry:
-            recwarn.clear()
-            cur = aconn.cursor("foo")
-            await cur.close()
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+async def test_close_noop(aconn, recwarn):
+    recwarn.clear()
+    cur = aconn.cursor("foo")
+    await cur.close()
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 async def test_close_on_error(aconn):
@@ -216,21 +207,17 @@ async def test_pgresult(aconn):
     assert not cur.pgresult
 
 
-async def test_context(aconn, recwarn, retries):
-    async for retry in retries:
-        with retry:
-            recwarn.clear()
-            async with aconn.cursor("foo") as cur:
-                await cur.execute("select generate_series(1, 10) as bar")
+async def test_context(aconn, recwarn):
+    recwarn.clear()
+    async with aconn.cursor("foo") as cur:
+        await cur.execute("select generate_series(1, 10) as bar")
 
-            assert cur.closed
-            assert not await (
-                await aconn.execute(
-                    "select * from pg_cursors where name = 'foo'"
-                )
-            ).fetchone()
-            del cur
-            assert not recwarn, [str(w.message) for w in recwarn.list]
+    assert cur.closed
+    assert not await (
+        await aconn.execute("select * from pg_cursors where name = 'foo'")
+    ).fetchone()
+    del cur
+    assert not recwarn, [str(w.message) for w in recwarn.list]
 
 
 async def test_close_no_clobber(aconn):
@@ -239,14 +226,12 @@ async def test_close_no_clobber(aconn):
             await cur.execute("select 1 / %s", (0,))
 
 
-async def test_warn_close(aconn, recwarn, retries):
-    async for retry in retries:
-        with retry:
-            recwarn.clear()
-            cur = aconn.cursor("foo")
-            await cur.execute("select generate_series(1, 10) as bar")
-            del cur
-            assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+async def test_warn_close(aconn, recwarn):
+    recwarn.clear()
+    cur = aconn.cursor("foo")
+    await cur.execute("select generate_series(1, 10) as bar")
+    del cur
+    assert ".close()" in str(recwarn.pop(ResourceWarning).message)
 
 
 async def test_execute_reuse(aconn):
index 10e99bbbc0f698e7370d09b279a3799c7310a80e..aa79d1a591f09ee0ec3e3afeb35302bd22778ff1 100644 (file)
@@ -28,12 +28,10 @@ skip_if_not_linux = pytest.mark.skipif(
 
 
 @pytest.mark.parametrize("timeout", timeouts)
-def test_wait_conn(dsn, timeout, retries):
-    for retry in retries:
-        with retry:
-            gen = generators.connect(dsn)
-            conn = waiting.wait_conn(gen, **timeout)
-            assert conn.status == ConnStatus.OK
+def test_wait_conn(dsn, timeout):
+    gen = generators.connect(dsn)
+    conn = waiting.wait_conn(gen, **timeout)
+    assert conn.status == ConnStatus.OK
 
 
 def test_wait_conn_bad(dsn):