TODO
+ .. automethod:: fileno
+
.. autoattribute:: prepare_threshold
:annotation: Optional[int]
# implement the AdaptContext protocol
return self
+ def fileno(self) -> int:
+ """Return the file descriptor of the connection.
+
+ This function allows to use the connection as file-like object in
+ functions waiting for readiness, such as the ones defined in the
+ `selectors` module.
+ """
+ try:
+ return self.pgconn.socket
+ except pq.PQerror as exc:
+ raise e.OperationalError(str(exc))
+
def cancel(self) -> None:
"""Cancel the current operation on the connection."""
c = self.pgconn.get_cancel()
import time
import queue
import pytest
+import selectors
import threading
import subprocess as sp
# still working
conn.rollback()
assert cur.execute("select 1").fetchone()[0] == 1
+
+
+@pytest.mark.slow
+def test_identify_closure(conn, dsn):
+ conn2 = psycopg3.connect(dsn)
+
+ def closer():
+ time.sleep(0.3)
+ conn2.execute(
+ "select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
+ )
+
+ t0 = time.time()
+ sel = selectors.DefaultSelector()
+ sel.register(conn, selectors.EVENT_READ)
+ t = threading.Thread(target=closer)
+ t.start()
+
+ assert sel.select(timeout=1.0)
+ with pytest.raises(psycopg3.OperationalError):
+ conn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.5
cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+async def test_identify_closure(aconn, dsn):
+ conn2 = await psycopg3.AsyncConnection.connect(dsn)
+
+ async def closer():
+ await asyncio.sleep(0.3)
+ await conn2.execute(
+ "select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
+ )
+
+ t0 = time.time()
+ ev = asyncio.Event()
+ loop = asyncio.get_event_loop()
+ loop.add_reader(aconn.fileno(), ev.set)
+ asyncio.ensure_future(closer())
+
+ await asyncio.wait_for(ev.wait(), 1.0)
+ with pytest.raises(psycopg3.OperationalError):
+ await aconn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.5
assert "[IDLE]" in str(conn)
conn.close()
assert "[BAD]" in str(conn)
+
+
+def test_fileno(conn):
+ assert conn.fileno() == conn.pgconn.socket
+ conn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ conn.fileno()
assert "[IDLE]" in str(aconn)
await aconn.close()
assert "[BAD]" in str(aconn)
+
+
+async def test_fileno(aconn):
+ assert aconn.fileno() == aconn.pgconn.socket
+ await aconn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ aconn.fileno()