]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test(crdb): add tests related to session termination
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 7 Jun 2022 22:02:19 +0000 (00:02 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 12 Jul 2022 11:58:34 +0000 (12:58 +0100)
tests/crdb/test_connection.py
tests/crdb/test_connection_async.py

index 724728f69d320d9f18179698b4251a872a6b25b4..e748ca330223417b61be023b977def124b43de46 100644 (file)
@@ -1,3 +1,6 @@
+import time
+import threading
+
 import psycopg.crdb
 from psycopg import errors as e
 from psycopg.crdb import CrdbConnection
@@ -36,3 +39,38 @@ def test_tpc_recover(dsn):
     with CrdbConnection.connect(dsn) as conn:
         with pytest.raises(e.NotSupportedError):
             conn.tpc_recover()
+
+
+def test_broken(conn):
+    (session_id,) = conn.execute("show session_id").fetchone()
+    with pytest.raises(psycopg.OperationalError):
+        conn.execute("cancel session %s", [session_id])
+
+    assert conn.closed
+    assert conn.broken
+    conn.close()
+    assert conn.closed
+    assert conn.broken
+
+
+@pytest.mark.slow
+def test_identify_closure(conn_cls, dsn):
+    with conn_cls.connect(dsn, autocommit=True) as conn:
+        with conn_cls.connect(dsn, autocommit=True) as conn2:
+            (session_id,) = conn.execute("show session_id").fetchone()
+
+            def closer():
+                time.sleep(0.2)
+                conn2.execute("cancel session %s", [session_id])
+
+            t = threading.Thread(target=closer)
+            t.start()
+            t0 = time.time()
+            try:
+                with pytest.raises(psycopg.OperationalError):
+                    conn.execute("select pg_sleep(3.0)")
+                dt = time.time() - t0
+                # CRDB seems to take not less than 1s
+                assert 0.2 < dt < 2
+            finally:
+                t.join()
index 9d3d16e84da0554c843c657230816f287de32106..b400619fd176d246cb797de9ed4438dd364e23f7 100644 (file)
@@ -1,6 +1,10 @@
+import time
+import asyncio
+
 import psycopg.crdb
 from psycopg import errors as e
 from psycopg.crdb import AsyncCrdbConnection
+from psycopg._compat import create_task
 
 import pytest
 
@@ -33,3 +37,38 @@ async def test_tpc_recover(dsn):
     async with await AsyncCrdbConnection.connect(dsn) as conn:
         with pytest.raises(e.NotSupportedError):
             await conn.tpc_recover()
+
+
+async def test_broken(aconn):
+    cur = await aconn.execute("show session_id")
+    (session_id,) = await cur.fetchone()
+    with pytest.raises(psycopg.OperationalError):
+        await aconn.execute("cancel session %s", [session_id])
+
+    assert aconn.closed
+    assert aconn.broken
+    await aconn.close()
+    assert aconn.closed
+    assert aconn.broken
+
+
+@pytest.mark.slow
+async def test_identify_closure(aconn_cls, dsn):
+    async with await aconn_cls.connect(dsn) as conn:
+        async with await aconn_cls.connect(dsn) as conn2:
+            cur = await conn.execute("show session_id")
+            (session_id,) = await cur.fetchone()
+
+            async def closer():
+                await asyncio.sleep(0.2)
+                await conn2.execute("cancel session %s", [session_id])
+
+            t = create_task(closer())
+            t0 = time.time()
+            try:
+                with pytest.raises(psycopg.OperationalError):
+                    await conn.execute("select pg_sleep(3.0)")
+                dt = time.time() - t0
+                assert 0.2 < dt < 2
+            finally:
+                await asyncio.gather(t)