From: Daniele Varrazzo Date: Tue, 7 Jun 2022 22:02:19 +0000 (+0200) Subject: test(crdb): add tests related to session termination X-Git-Tag: 3.1~49^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7a6e042cd43c5c0f7eaf09e3572f410814d2bc3c;p=thirdparty%2Fpsycopg.git test(crdb): add tests related to session termination --- diff --git a/tests/crdb/test_connection.py b/tests/crdb/test_connection.py index 724728f69..e748ca330 100644 --- a/tests/crdb/test_connection.py +++ b/tests/crdb/test_connection.py @@ -1,3 +1,6 @@ +import time +import threading + import psycopg.crdb from psycopg import errors as e from psycopg.crdb import CrdbConnection @@ -36,3 +39,38 @@ def test_tpc_recover(dsn): with CrdbConnection.connect(dsn) as conn: with pytest.raises(e.NotSupportedError): conn.tpc_recover() + + +def test_broken(conn): + (session_id,) = conn.execute("show session_id").fetchone() + with pytest.raises(psycopg.OperationalError): + conn.execute("cancel session %s", [session_id]) + + assert conn.closed + assert conn.broken + conn.close() + assert conn.closed + assert conn.broken + + +@pytest.mark.slow +def test_identify_closure(conn_cls, dsn): + with conn_cls.connect(dsn, autocommit=True) as conn: + with conn_cls.connect(dsn, autocommit=True) as conn2: + (session_id,) = conn.execute("show session_id").fetchone() + + def closer(): + time.sleep(0.2) + conn2.execute("cancel session %s", [session_id]) + + t = threading.Thread(target=closer) + t.start() + t0 = time.time() + try: + with pytest.raises(psycopg.OperationalError): + conn.execute("select pg_sleep(3.0)") + dt = time.time() - t0 + # CRDB seems to take not less than 1s + assert 0.2 < dt < 2 + finally: + t.join() diff --git a/tests/crdb/test_connection_async.py b/tests/crdb/test_connection_async.py index 9d3d16e84..b400619fd 100644 --- a/tests/crdb/test_connection_async.py +++ b/tests/crdb/test_connection_async.py @@ -1,6 +1,10 @@ +import time +import asyncio + import psycopg.crdb from psycopg import errors as e from psycopg.crdb import AsyncCrdbConnection +from psycopg._compat import create_task import pytest @@ -33,3 +37,38 @@ async def test_tpc_recover(dsn): async with await AsyncCrdbConnection.connect(dsn) as conn: with pytest.raises(e.NotSupportedError): await conn.tpc_recover() + + +async def test_broken(aconn): + cur = await aconn.execute("show session_id") + (session_id,) = await cur.fetchone() + with pytest.raises(psycopg.OperationalError): + await aconn.execute("cancel session %s", [session_id]) + + assert aconn.closed + assert aconn.broken + await aconn.close() + assert aconn.closed + assert aconn.broken + + +@pytest.mark.slow +async def test_identify_closure(aconn_cls, dsn): + async with await aconn_cls.connect(dsn) as conn: + async with await aconn_cls.connect(dsn) as conn2: + cur = await conn.execute("show session_id") + (session_id,) = await cur.fetchone() + + async def closer(): + await asyncio.sleep(0.2) + await conn2.execute("cancel session %s", [session_id]) + + t = create_task(closer()) + t0 = time.time() + try: + with pytest.raises(psycopg.OperationalError): + await conn.execute("select pg_sleep(3.0)") + dt = time.time() - t0 + assert 0.2 < dt < 2 + finally: + await asyncio.gather(t)