- Fix wrong escaping of unprintable chars in COPY (nonetheless correctly
interpreted by PostgreSQL).
+- Restore the connection to usable state after an error in `~Cursor.stream()`.
Current release
SINGLE_TUPLE = pq.ExecStatus.SINGLE_TUPLE
PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
+ACTIVE = pq.TransactionStatus.ACTIVE
+
class BaseCursor(Generic[ConnectionType, Row]):
__slots__ = """
yield rec
first = False
except e.Error as ex:
+ # try to get out of ACTIVE state. Just do a single attempt, which
+ # shoud work to recover from an error or query cancelled.
+ if self._pgconn.transaction_status == ACTIVE:
+ try:
+ self._conn.wait(self._stream_fetchone_gen(first))
+ except Exception:
+ pass
+
raise ex.with_traceback(None)
def fetchone(self) -> Optional[Row]:
from typing import Optional, Type, TypeVar, TYPE_CHECKING, overload
from contextlib import asynccontextmanager
+from . import pq
from . import errors as e
from .abc import Query, Params
from .copy import AsyncCopy
yield rec
first = False
except e.Error as ex:
+ # try to get out of ACTIVE state. Just do a single attempt, which
+ # shoud work to recover from an error or query cancelled.
+ if self._pgconn.transaction_status == pq.TransactionStatus.ACTIVE:
+ try:
+ await self._conn.wait(self._stream_fetchone_gen(first))
+ except Exception:
+ pass
+
raise ex.with_traceback(None)
async def fetchone(self) -> Optional[Row]:
import pytest
import psycopg
+from psycopg import errors as e
@pytest.mark.slow
t.join()
+def canceller(conn, errors):
+ try:
+ time.sleep(0.5)
+ conn.cancel()
+ except Exception as exc:
+ errors.append(exc)
+
+
@pytest.mark.slow
def test_cancel(conn):
- def canceller():
- try:
- time.sleep(0.5)
- conn.cancel()
- except Exception as exc:
- errors.append(exc)
-
errors: List[Exception] = []
cur = conn.cursor()
- t = threading.Thread(target=canceller)
+ t = threading.Thread(target=canceller, args=(conn, errors))
t0 = time.time()
t.start()
- with pytest.raises(psycopg.DatabaseError):
+ with pytest.raises(e.QueryCanceled):
cur.execute("select pg_sleep(2)")
t1 = time.time()
t.join()
+@pytest.mark.slow
+def test_cancel_stream(conn):
+ errors: List[Exception] = []
+
+ cur = conn.cursor()
+ t = threading.Thread(target=canceller, args=(conn, errors))
+ t0 = time.time()
+ t.start()
+
+ with pytest.raises(e.QueryCanceled):
+ for row in cur.stream("select pg_sleep(2)"):
+ pass
+
+ t1 = time.time()
+ assert not errors
+ assert 0.0 < t1 - t0 < 1.0
+
+ # still working
+ conn.rollback()
+ assert cur.execute("select 1").fetchone()[0] == 1
+
+ t.join()
+
+
@pytest.mark.slow
def test_identify_closure(dsn):
def closer():
import pytest
import psycopg
+from psycopg import errors as e
from psycopg._compat import create_task
pytestmark = pytest.mark.asyncio
assert t1 - t0 == pytest.approx(0.5, abs=0.05)
+async def canceller(aconn, errors):
+ try:
+ await asyncio.sleep(0.5)
+ aconn.cancel()
+ except Exception as exc:
+ errors.append(exc)
+
+
@pytest.mark.slow
async def test_cancel(aconn):
- async def canceller():
- try:
- await asyncio.sleep(0.5)
- aconn.cancel()
- except Exception as exc:
- errors.append(exc)
-
async def worker():
cur = aconn.cursor()
- with pytest.raises(psycopg.DatabaseError):
+ with pytest.raises(e.QueryCanceled):
await cur.execute("select pg_sleep(2)")
errors: List[Exception] = []
- workers = [worker(), canceller()]
+ workers = [worker(), canceller(aconn, errors)]
+
+ t0 = time.time()
+ await asyncio.gather(*workers)
+
+ t1 = time.time()
+ assert not errors
+ assert 0.0 < t1 - t0 < 1.0
+
+ # still working
+ await aconn.rollback()
+ cur = aconn.cursor()
+ await cur.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+async def test_cancel_stream(aconn):
+ async def worker():
+ cur = aconn.cursor()
+ with pytest.raises(e.QueryCanceled):
+ async for row in cur.stream("select pg_sleep(2)"):
+ pass
+
+ errors: List[Exception] = []
+ workers = [worker(), canceller(aconn, errors)]
t0 = time.time()
await asyncio.gather(*workers)
pass
+def test_stream_error_tx(conn):
+ cur = conn.cursor()
+ with pytest.raises(psycopg.ProgrammingError):
+ for rec in cur.stream("wat"):
+ pass
+ assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+
+
+def test_stream_error_notx(conn):
+ conn.autocommit = True
+ cur = conn.cursor()
+ with pytest.raises(psycopg.ProgrammingError):
+ for rec in cur.stream("wat"):
+ pass
+ assert conn.info.transaction_status == conn.TransactionStatus.IDLE
+
+
def test_stream_binary_cursor(conn):
cur = conn.cursor(binary=True)
recs = []
pass
+async def test_stream_error_tx(aconn):
+ cur = aconn.cursor()
+ with pytest.raises(psycopg.ProgrammingError):
+ async for rec in cur.stream("wat"):
+ pass
+ assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+
+
+async def test_stream_error_notx(aconn):
+ await aconn.set_autocommit(True)
+ cur = aconn.cursor()
+ with pytest.raises(psycopg.ProgrammingError):
+ async for rec in cur.stream("wat"):
+ pass
+ assert aconn.info.transaction_status == aconn.TransactionStatus.IDLE
+
+
async def test_stream_binary_cursor(aconn):
cur = aconn.cursor(binary=True)
recs = []