+import time
+import threading
+
import psycopg.crdb
from psycopg import errors as e
from psycopg.crdb import CrdbConnection
with CrdbConnection.connect(dsn) as conn:
with pytest.raises(e.NotSupportedError):
conn.tpc_recover()
+
+
+def test_broken(conn):
+ (session_id,) = conn.execute("show session_id").fetchone()
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("cancel session %s", [session_id])
+
+ assert conn.closed
+ assert conn.broken
+ conn.close()
+ assert conn.closed
+ assert conn.broken
+
+
+@pytest.mark.slow
+def test_identify_closure(conn_cls, dsn):
+ with conn_cls.connect(dsn, autocommit=True) as conn:
+ with conn_cls.connect(dsn, autocommit=True) as conn2:
+ (session_id,) = conn.execute("show session_id").fetchone()
+
+ def closer():
+ time.sleep(0.2)
+ conn2.execute("cancel session %s", [session_id])
+
+ t = threading.Thread(target=closer)
+ t.start()
+ t0 = time.time()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select pg_sleep(3.0)")
+ dt = time.time() - t0
+ # CRDB seems to take not less than 1s
+ assert 0.2 < dt < 2
+ finally:
+ t.join()
+import time
+import asyncio
+
import psycopg.crdb
from psycopg import errors as e
from psycopg.crdb import AsyncCrdbConnection
+from psycopg._compat import create_task
import pytest
async with await AsyncCrdbConnection.connect(dsn) as conn:
with pytest.raises(e.NotSupportedError):
await conn.tpc_recover()
+
+
+async def test_broken(aconn):
+ cur = await aconn.execute("show session_id")
+ (session_id,) = await cur.fetchone()
+ with pytest.raises(psycopg.OperationalError):
+ await aconn.execute("cancel session %s", [session_id])
+
+ assert aconn.closed
+ assert aconn.broken
+ await aconn.close()
+ assert aconn.closed
+ assert aconn.broken
+
+
+@pytest.mark.slow
+async def test_identify_closure(aconn_cls, dsn):
+ async with await aconn_cls.connect(dsn) as conn:
+ async with await aconn_cls.connect(dsn) as conn2:
+ cur = await conn.execute("show session_id")
+ (session_id,) = await cur.fetchone()
+
+ async def closer():
+ await asyncio.sleep(0.2)
+ await conn2.execute("cancel session %s", [session_id])
+
+ t = create_task(closer())
+ t0 = time.time()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ await conn.execute("select pg_sleep(3.0)")
+ dt = time.time() - t0
+ assert 0.2 < dt < 2
+ finally:
+ await asyncio.gather(t)