@property
def info(self) -> List["ConninfoOption"]:
- if not self.pgconn_ptr:
- raise PQerror("the connection is closed")
+ self._ensure_pgconn()
opts = impl.PQconninfo(self.pgconn_ptr)
if not opts:
raise MemoryError("couldn't allocate connection info")
impl.PQconninfoFree(opts)
def reset(self) -> None:
- if not self.pgconn_ptr:
- raise PQerror("the connection is no more available")
+ self._ensure_pgconn()
impl.PQreset(self.pgconn_ptr)
def reset_start(self) -> None:
return TransactionStatus(rv)
def parameter_status(self, name: bytes) -> Optional[bytes]:
- if not self.pgconn_ptr:
- raise PQerror("the connection is closed")
+ self._ensure_pgconn()
return impl.PQparameterStatus(self.pgconn_ptr, name)
@property
def exec_(self, command: bytes) -> "PGresult":
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
+ self._ensure_pgconn()
rv = impl.PQexec(self.pgconn_ptr, command)
if not rv:
raise MemoryError("couldn't allocate PGresult")
args = self._query_params_args(
command, param_values, param_types, param_formats, result_format
)
+ self._ensure_pgconn()
rv = impl.PQexecParams(*args)
if not rv:
raise MemoryError("couldn't allocate PGresult")
args = self._query_params_args(
command, param_values, param_types, param_formats, result_format
)
+ self._ensure_pgconn()
if not impl.PQsendQueryParams(*args):
raise PQerror(
f"sending query and params failed: {error_message(self)}"
nparams = len(param_types)
atypes = (impl.Oid * nparams)(*param_types)
+ self._ensure_pgconn()
rv = impl.PQprepare(self.pgconn_ptr, name, command, nparams, atypes)
if not rv:
raise MemoryError("couldn't allocate PGresult")
)
aformats = (c_int * nparams)(*param_formats)
+ self._ensure_pgconn()
rv = impl.PQexecPrepared(
self.pgconn_ptr,
name,
def describe_prepared(self, name: bytes) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
+ self._ensure_pgconn()
rv = impl.PQdescribePrepared(self.pgconn_ptr, name)
if not rv:
raise MemoryError("couldn't allocate PGresult")
def describe_portal(self, name: bytes) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
+ self._ensure_pgconn()
rv = impl.PQdescribePortal(self.pgconn_ptr, name)
if not rv:
raise MemoryError("couldn't allocate PGresult")
raise PQerror("the connection is closed")
return bool(func(self.pgconn_ptr))
+ def _ensure_pgconn(self) -> None:
+ if not self.pgconn_ptr:
+ raise PQerror("the connection is closed")
+
class PGresult:
__slots__ = ("pgresult_ptr",)
import pytest
+import psycopg3
+
def test_exec_none(pq, pgconn):
with pytest.raises(TypeError):
def test_exec(pq, pgconn):
res = pgconn.exec_(b"select 'hel' || 'lo'")
assert res.get_value(0, 0) == b"hello"
+ pgconn.finish()
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.exec_(b"select 'hello'")
def test_exec_params(pq, pgconn):
res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
+ pgconn.finish()
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
def test_exec_params_empty(pq, pgconn):
res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
assert res.get_value(0, 0) == b"8"
+ pgconn.finish()
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.prepare(b"prep", b"select $1::int + $2::int")
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.exec_prepared(b"prep", [b"3", b"5"])
+
def test_prepare_types(pq, pgconn):
res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.nfields == 1
assert res.fname(0) == b"foo"
+
+ pgconn.finish()
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.describe_portal(b"cur")