]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Retry more flaky tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 12 Jul 2021 16:37:17 +0000 (18:37 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 12 Jul 2021 22:31:44 +0000 (00:31 +0200)
tests/pool/test_pool.py
tests/pool/test_pool_async.py
tests/test_concurrency.py
tests/test_concurrency_async.py
tests/test_cursor.py
tests/test_cursor_async.py

index 39da2d0b41eaedcf542a23df529a60cb9e11a29f..10509483810e5076364638975de55926f1fbb12a 100644 (file)
@@ -605,10 +605,7 @@ def test_closed_putconn(dsn):
 
 
 @pytest.mark.slow
-def test_closed_queue(dsn):
-    p = pool.ConnectionPool(dsn, min_size=1)
-    success = []
-
+def test_closed_queue(dsn, retries):
     def w1():
         with p.connection() as conn:
             assert (
@@ -622,15 +619,20 @@ def test_closed_queue(dsn):
                 pass
         success.append("w2")
 
-    t1 = Thread(target=w1)
-    t2 = Thread(target=w2)
-    t1.start()
-    sleep(0.1)
-    t2.start()
-    p.close()
-    t1.join()
-    t2.join()
-    assert len(success) == 2
+    for retry in retries:
+        with retry:
+            p = pool.ConnectionPool(dsn, min_size=1)
+            success = []
+
+            t1 = Thread(target=w1)
+            t2 = Thread(target=w2)
+            t1.start()
+            sleep(0.1)
+            t2.start()
+            p.close()
+            t1.join()
+            t2.join()
+            assert len(success) == 2
 
 
 @pytest.mark.slow
@@ -697,7 +699,7 @@ def test_shrink(dsn, monkeypatch):
 
 
 @pytest.mark.slow
-def test_reconnect(proxy, caplog, monkeypatch):
+def test_reconnect(proxy, caplog, monkeypatch, retries):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
 
     assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
@@ -705,31 +707,35 @@ def test_reconnect(proxy, caplog, monkeypatch):
     monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
     monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
 
-    proxy.start()
-    with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
-        p.wait(2.0)
-        proxy.stop()
-
-        with pytest.raises(psycopg.OperationalError):
-            with p.connection() as conn:
-                conn.execute("select 1")
-
-        sleep(1.0)
-        proxy.start()
-        p.wait()
-
-        with p.connection() as conn:
-            conn.execute("select 1")
+    for retry in retries:
+        with retry:
+            proxy.start()
+            with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
+                p.wait(2.0)
+                proxy.stop()
 
-    assert "BAD" in caplog.messages[0]
-    times = [rec.created for rec in caplog.records]
-    assert times[1] - times[0] < 0.05
-    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
-    assert len(deltas) == 3
-    want = 0.1
-    for delta in deltas:
-        assert delta == pytest.approx(want, 0.05), deltas
-        want *= 2
+                with pytest.raises(psycopg.OperationalError):
+                    with p.connection() as conn:
+                        conn.execute("select 1")
+
+                sleep(1.0)
+                proxy.start()
+                p.wait()
+
+                with p.connection() as conn:
+                    conn.execute("select 1")
+
+            assert "BAD" in caplog.messages[0]
+            times = [rec.created for rec in caplog.records]
+            assert times[1] - times[0] < 0.05
+            deltas = [
+                times[i + 1] - times[i] for i in range(1, len(times) - 1)
+            ]
+            assert len(deltas) == 3
+            want = 0.1
+            for delta in deltas:
+                assert delta == pytest.approx(want, 0.05), deltas
+                want *= 2
 
 
 @pytest.mark.slow
index c3a6ce95dff4bcc8fe08f9e392d0344c400fb25a..bbf859208e561c9d0480c65a418539a126630f59 100644 (file)
@@ -621,10 +621,7 @@ async def test_closed_putconn(dsn):
 
 
 @pytest.mark.slow
-async def test_closed_queue(dsn):
-    p = pool.AsyncConnectionPool(dsn, min_size=1)
-    success = []
-
+async def test_closed_queue(dsn, retries):
     async def w1():
         async with p.connection() as conn:
             res = await conn.execute("select 1 from pg_sleep(0.2)")
@@ -637,12 +634,17 @@ async def test_closed_queue(dsn):
                 pass
         success.append("w2")
 
-    t1 = create_task(w1())
-    await asyncio.sleep(0.1)
-    t2 = create_task(w2())
-    await p.close()
-    await asyncio.gather(t1, t2)
-    assert len(success) == 2
+    async for retry in retries:
+        with retry:
+            p = pool.AsyncConnectionPool(dsn, min_size=1)
+            success = []
+
+            t1 = create_task(w1())
+            await asyncio.sleep(0.1)
+            t2 = create_task(w2())
+            await p.close()
+            await asyncio.gather(t1, t2)
+            assert len(success) == 2
 
 
 @pytest.mark.slow
@@ -711,42 +713,49 @@ async def test_shrink(dsn, monkeypatch):
 
 
 @pytest.mark.slow
-async def test_reconnect(proxy, caplog, monkeypatch):
+async def test_reconnect(proxy, caplog, monkeypatch, retries):
     assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
     assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
     monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
     monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
 
-    proxy.start()
-    async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=1) as p:
-        await p.wait(2.0)
-        proxy.stop()
-
-        with pytest.raises(psycopg.OperationalError):
-            async with p.connection() as conn:
-                await conn.execute("select 1")
-
-        await asyncio.sleep(1.0)
-        proxy.start()
-        await p.wait()
-
-        async with p.connection() as conn:
-            await conn.execute("select 1")
+    async for retry in retries:
+        with retry:
+            proxy.start()
+            async with pool.AsyncConnectionPool(
+                proxy.client_dsn, min_size=1
+            ) as p:
+                await p.wait(2.0)
+                proxy.stop()
 
-    recs = [
-        r
-        for r in caplog.records
-        if r.name.startswith("psycopg") and r.levelno >= logging.WARNING
-    ]
-    assert "BAD" in recs[0].message
-    times = [rec.created for rec in recs]
-    assert times[1] - times[0] < 0.05
-    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
-    assert len(deltas) == 3
-    want = 0.1
-    for delta in deltas:
-        assert delta == pytest.approx(want, 0.05), deltas
-        want *= 2
+                with pytest.raises(psycopg.OperationalError):
+                    async with p.connection() as conn:
+                        await conn.execute("select 1")
+
+                await asyncio.sleep(1.0)
+                proxy.start()
+                await p.wait()
+
+                async with p.connection() as conn:
+                    await conn.execute("select 1")
+
+            recs = [
+                r
+                for r in caplog.records
+                if r.name.startswith("psycopg")
+                and r.levelno >= logging.WARNING
+            ]
+            assert "BAD" in recs[0].message
+            times = [rec.created for rec in recs]
+            assert times[1] - times[0] < 0.05
+            deltas = [
+                times[i + 1] - times[i] for i in range(1, len(times) - 1)
+            ]
+            assert len(deltas) == 3
+            want = 0.1
+            for delta in deltas:
+                assert delta == pytest.approx(want, 0.05), deltas
+                want *= 2
 
 
 @pytest.mark.slow
index 28de5e583c1cb7f4f6e4e3d4960c37f8e0eaf796..b25bf4b48cdde31b270bfaa145b2c3fc3f23c46e 100644 (file)
@@ -175,23 +175,26 @@ def test_cancel(conn):
 
 
 @pytest.mark.slow
-def test_identify_closure(conn, dsn):
-    conn2 = psycopg.connect(dsn)
-
+def test_identify_closure(dsn, retries):
     def closer():
         time.sleep(0.3)
         conn2.execute(
             "select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
         )
 
-    t0 = time.time()
-    sel = selectors.DefaultSelector()
-    sel.register(conn, selectors.EVENT_READ)
-    t = threading.Thread(target=closer)
-    t.start()
-
-    assert sel.select(timeout=1.0)
-    with pytest.raises(psycopg.OperationalError):
-        conn.execute("select 1")
-    t1 = time.time()
-    assert 0.3 < t1 - t0 < 0.6
+    for retry in retries:
+        with retry:
+            conn = psycopg.connect(dsn)
+            conn2 = psycopg.connect(dsn)
+
+            t0 = time.time()
+            sel = selectors.DefaultSelector()
+            sel.register(conn, selectors.EVENT_READ)
+            t = threading.Thread(target=closer)
+            t.start()
+
+            assert sel.select(timeout=1.0)
+            with pytest.raises(psycopg.OperationalError):
+                conn.execute("select 1")
+            t1 = time.time()
+            assert 0.3 < t1 - t0 < 0.6
index 3fdab911e762e028a9c11287f81e67d6ae271a4b..190b6478e57896764477f9166c2e2c114d504ded 100644 (file)
@@ -133,23 +133,26 @@ async def test_cancel(aconn):
 
 
 @pytest.mark.slow
-async def test_identify_closure(aconn, dsn):
-    conn2 = await psycopg.AsyncConnection.connect(dsn)
-
+async def test_identify_closure(dsn, retries):
     async def closer():
         await asyncio.sleep(0.3)
         await conn2.execute(
             "select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
         )
 
-    t0 = time.time()
-    ev = asyncio.Event()
-    loop = asyncio.get_event_loop()
-    loop.add_reader(aconn.fileno(), ev.set)
-    create_task(closer())
-
-    await asyncio.wait_for(ev.wait(), 1.0)
-    with pytest.raises(psycopg.OperationalError):
-        await aconn.execute("select 1")
-    t1 = time.time()
-    assert 0.3 < t1 - t0 < 0.6
+    async for retry in retries:
+        with retry:
+            aconn = await psycopg.AsyncConnection.connect(dsn)
+            conn2 = await psycopg.AsyncConnection.connect(dsn)
+
+            t0 = time.time()
+            ev = asyncio.Event()
+            loop = asyncio.get_event_loop()
+            loop.add_reader(aconn.fileno(), ev.set)
+            create_task(closer())
+
+            await asyncio.wait_for(ev.wait(), 1.0)
+            with pytest.raises(psycopg.OperationalError):
+                await aconn.execute("select 1")
+            t1 = time.time()
+            assert 0.3 < t1 - t0 < 0.6
index 5c40f6460867199b11ad600275dbbc93932e02cc..af5ffdc9037a818d3650673e36310c08a148f01d 100644 (file)
@@ -543,47 +543,51 @@ def test_str(conn):
 @pytest.mark.parametrize(
     "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
 )
-def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
+def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
     faker.format = fmt
     faker.choose_schema(ncols=5)
     faker.make_records(10)
     row_factory = getattr(rows, row_factory)
 
-    n = []
-    gc_collect()
-    for i in range(3):
-        with psycopg.connect(dsn) as conn:
-            with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
-                cur.execute(faker.drop_stmt)
-                cur.execute(faker.create_stmt)
-                cur.executemany(faker.insert_stmt, faker.records)
-                cur.execute(faker.select_stmt)
-
-                if fetch == "one":
-                    while 1:
-                        tmp = cur.fetchone()
-                        if tmp is None:
-                            break
-                elif fetch == "many":
-                    while 1:
-                        tmp = cur.fetchmany(3)
-                        if not tmp:
-                            break
-                elif fetch == "all":
-                    cur.fetchall()
-                elif fetch == "iter":
-                    for rec in cur:
-                        pass
-
-                tmp = None
-
-        del cur, conn
-        gc_collect()
-        n.append(len(gc.get_objects()))
-
-    assert (
-        n[0] == n[1] == n[2]
-    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    for retry in retries:
+        with retry:
+            n = []
+            gc_collect()
+            for i in range(3):
+                with psycopg.connect(dsn) as conn:
+                    with conn.cursor(
+                        binary=fmt_out, row_factory=row_factory
+                    ) as cur:
+                        cur.execute(faker.drop_stmt)
+                        cur.execute(faker.create_stmt)
+                        cur.executemany(faker.insert_stmt, faker.records)
+                        cur.execute(faker.select_stmt)
+
+                        if fetch == "one":
+                            while 1:
+                                tmp = cur.fetchone()
+                                if tmp is None:
+                                    break
+                        elif fetch == "many":
+                            while 1:
+                                tmp = cur.fetchmany(3)
+                                if not tmp:
+                                    break
+                        elif fetch == "all":
+                            cur.fetchall()
+                        elif fetch == "iter":
+                            for rec in cur:
+                                pass
+
+                        tmp = None
+
+                del cur, conn
+                gc_collect()
+                n.append(len(gc.get_objects()))
+
+            assert (
+                n[0] == n[1] == n[2]
+            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
 def my_row_factory(cursor):
index 965d99894110233c5782128d5b92e129d3f913bb..7d9dbdd9a0f05d5f28532fbca4a9e2a7641b1a82 100644 (file)
@@ -457,46 +457,48 @@ async def test_str(aconn):
 @pytest.mark.parametrize(
     "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
 )
-async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
+async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
     faker.format = fmt
     faker.choose_schema(ncols=5)
     faker.make_records(10)
     row_factory = getattr(rows, row_factory)
 
-    n = []
-    gc_collect()
-    for i in range(3):
-        async with await psycopg.AsyncConnection.connect(dsn) as conn:
-            async with conn.cursor(
-                binary=fmt_out, row_factory=row_factory
-            ) as cur:
-                await cur.execute(faker.drop_stmt)
-                await cur.execute(faker.create_stmt)
-                await cur.executemany(faker.insert_stmt, faker.records)
-                await cur.execute(faker.select_stmt)
-
-                if fetch == "one":
-                    while 1:
-                        tmp = await cur.fetchone()
-                        if tmp is None:
-                            break
-                elif fetch == "many":
-                    while 1:
-                        tmp = await cur.fetchmany(3)
-                        if not tmp:
-                            break
-                elif fetch == "all":
-                    await cur.fetchall()
-                elif fetch == "iter":
-                    async for rec in cur:
-                        pass
-
-                tmp = None
-
-        del cur, conn
-        gc_collect()
-        n.append(len(gc.get_objects()))
-
-    assert (
-        n[0] == n[1] == n[2]
-    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+    async for retry in retries:
+        with retry:
+            n = []
+            gc_collect()
+            for i in range(3):
+                async with await psycopg.AsyncConnection.connect(dsn) as conn:
+                    async with conn.cursor(
+                        binary=fmt_out, row_factory=row_factory
+                    ) as cur:
+                        await cur.execute(faker.drop_stmt)
+                        await cur.execute(faker.create_stmt)
+                        await cur.executemany(faker.insert_stmt, faker.records)
+                        await cur.execute(faker.select_stmt)
+
+                        if fetch == "one":
+                            while 1:
+                                tmp = await cur.fetchone()
+                                if tmp is None:
+                                    break
+                        elif fetch == "many":
+                            while 1:
+                                tmp = await cur.fetchmany(3)
+                                if not tmp:
+                                    break
+                        elif fetch == "all":
+                            await cur.fetchall()
+                        elif fetch == "iter":
+                            async for rec in cur:
+                                pass
+
+                        tmp = None
+
+                del cur, conn
+                gc_collect()
+                n.append(len(gc.get_objects()))
+
+            assert (
+                n[0] == n[1] == n[2]
+            ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"