from functools import partial
from ctypes import Array, pointer, string_at, create_string_buffer, byref
-from ctypes import c_char_p, c_int, c_size_t, c_ulong
+from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong
from typing import Any, Callable, List, Optional, Sequence, Tuple
from typing import cast as t_cast, TYPE_CHECKING
except Exception as exc:
logger.exception("error in notice receiver: %s", exc)
- res.pgresult_ptr = None # avoid destroying the pgresult_ptr
+ res._pgresult_ptr = None # avoid destroying the pgresult_ptr
class PGconn:
__module__ = "psycopg3.pq"
__slots__ = (
- "pgconn_ptr",
+ "_pgconn_ptr",
"notice_handler",
"notify_handler",
"_notice_receiver",
)
def __init__(self, pgconn_ptr: impl.PGconn_struct):
- self.pgconn_ptr: Optional[impl.PGconn_struct] = pgconn_ptr
+ self._pgconn_ptr: Optional[impl.PGconn_struct] = pgconn_ptr
self.notice_handler: Optional[
Callable[["proto.PGresult"], None]
] = None
return self._call_int(impl.PQconnectPoll)
def finish(self) -> None:
- self.pgconn_ptr, p = None, self.pgconn_ptr
+ self._pgconn_ptr, p = None, self._pgconn_ptr
if p:
impl.PQfinish(p)
+ @property
+ def pgconn_ptr(self) -> Optional[int]:
+ """The pointer to the underlying ``PGconn`` structure, as integer.
+
+ `!None` if the connection is closed.
+
+ The value can be used to pass the structure to libpq functions which
+ psycopg3 doesn't (currently) wrap, either in C or in Python using FFI
+ libraries such as `ctypes`.
+ """
+ if self._pgconn_ptr is None:
+ return None
+
+ return addressof(self._pgconn_ptr.contents) # type: ignore[attr-defined]
+
@property
def info(self) -> List["ConninfoOption"]:
self._ensure_pgconn()
- opts = impl.PQconninfo(self.pgconn_ptr)
+ opts = impl.PQconninfo(self._pgconn_ptr)
if not opts:
raise MemoryError("couldn't allocate connection info")
try:
def reset(self) -> None:
self._ensure_pgconn()
- impl.PQreset(self.pgconn_ptr)
+ impl.PQreset(self._pgconn_ptr)
def reset_start(self) -> None:
- if not impl.PQresetStart(self.pgconn_ptr):
+ if not impl.PQresetStart(self._pgconn_ptr):
raise e.OperationalError("couldn't reset connection")
def reset_poll(self) -> int:
@property
def status(self) -> int:
- return impl.PQstatus(self.pgconn_ptr)
+ return impl.PQstatus(self._pgconn_ptr)
@property
def transaction_status(self) -> int:
- return impl.PQtransactionStatus(self.pgconn_ptr)
+ return impl.PQtransactionStatus(self._pgconn_ptr)
def parameter_status(self, name: bytes) -> Optional[bytes]:
self._ensure_pgconn()
- return impl.PQparameterStatus(self.pgconn_ptr, name)
+ return impl.PQparameterStatus(self._pgconn_ptr, name)
@property
def error_message(self) -> bytes:
- return impl.PQerrorMessage(self.pgconn_ptr)
+ return impl.PQerrorMessage(self._pgconn_ptr)
@property
def protocol_version(self) -> int:
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
- rv = impl.PQexec(self.pgconn_ptr, command)
+ rv = impl.PQexec(self._pgconn_ptr, command)
if not rv:
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
- if not impl.PQsendQuery(self.pgconn_ptr, command):
+ if not impl.PQsendQuery(self._pgconn_ptr, command):
raise e.OperationalError(
f"sending query failed: {error_message(self)}"
)
self._ensure_pgconn()
if not impl.PQsendPrepare(
- self.pgconn_ptr, name, command, nparams, atypes
+ self._pgconn_ptr, name, command, nparams, atypes
):
raise e.OperationalError(
f"sending query and params failed: {error_message(self)}"
aformats = (c_int * nparams)(*param_formats)
return (
- self.pgconn_ptr,
+ self._pgconn_ptr,
command,
nparams,
atypes,
atypes = (impl.Oid * nparams)(*param_types)
self._ensure_pgconn()
- rv = impl.PQprepare(self.pgconn_ptr, name, command, nparams, atypes)
+ rv = impl.PQprepare(self._pgconn_ptr, name, command, nparams, atypes)
if not rv:
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
self._ensure_pgconn()
rv = impl.PQexecPrepared(
- self.pgconn_ptr,
+ self._pgconn_ptr,
name,
nparams,
aparams,
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
- rv = impl.PQdescribePrepared(self.pgconn_ptr, name)
+ rv = impl.PQdescribePrepared(self._pgconn_ptr, name)
if not rv:
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
if not isinstance(name, bytes):
raise TypeError(f"bytes expected, got {type(name)} instead")
self._ensure_pgconn()
- if not impl.PQsendDescribePrepared(self.pgconn_ptr, name):
+ if not impl.PQsendDescribePrepared(self._pgconn_ptr, name):
raise e.OperationalError(
f"sending describe prepared failed: {error_message(self)}"
)
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
- rv = impl.PQdescribePortal(self.pgconn_ptr, name)
+ rv = impl.PQdescribePortal(self._pgconn_ptr, name)
if not rv:
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
if not isinstance(name, bytes):
raise TypeError(f"bytes expected, got {type(name)} instead")
self._ensure_pgconn()
- if not impl.PQsendDescribePortal(self.pgconn_ptr, name):
+ if not impl.PQsendDescribePortal(self._pgconn_ptr, name):
raise e.OperationalError(
f"sending describe portal failed: {error_message(self)}"
)
def get_result(self) -> Optional["PGresult"]:
- rv = impl.PQgetResult(self.pgconn_ptr)
+ rv = impl.PQgetResult(self._pgconn_ptr)
return PGresult(rv) if rv else None
def consume_input(self) -> None:
- if 1 != impl.PQconsumeInput(self.pgconn_ptr):
+ if 1 != impl.PQconsumeInput(self._pgconn_ptr):
raise e.OperationalError(
f"consuming input failed: {error_message(self)}"
)
def is_busy(self) -> int:
- return impl.PQisBusy(self.pgconn_ptr)
+ return impl.PQisBusy(self._pgconn_ptr)
@property
def nonblocking(self) -> int:
- return impl.PQisnonblocking(self.pgconn_ptr)
+ return impl.PQisnonblocking(self._pgconn_ptr)
@nonblocking.setter
def nonblocking(self, arg: int) -> None:
- if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
+ if 0 > impl.PQsetnonblocking(self._pgconn_ptr, arg):
raise e.OperationalError(
f"setting nonblocking failed: {error_message(self)}"
)
def flush(self) -> int:
- if not self.pgconn_ptr:
+ if not self._pgconn_ptr:
raise e.OperationalError(
"flushing failed: the connection is closed"
)
- rv: int = impl.PQflush(self.pgconn_ptr)
+ rv: int = impl.PQflush(self._pgconn_ptr)
if rv < 0:
raise e.OperationalError(f"flushing failed: {error_message(self)}")
return rv
def set_single_row_mode(self) -> None:
- if not impl.PQsetSingleRowMode(self.pgconn_ptr):
+ if not impl.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
def get_cancel(self) -> "PGcancel":
See :pq:`PQgetCancel` for details.
"""
- rv = impl.PQgetCancel(self.pgconn_ptr)
+ rv = impl.PQgetCancel(self._pgconn_ptr)
if not rv:
raise e.OperationalError("couldn't create cancel object")
return PGcancel(rv)
def notifies(self) -> Optional[PGnotify]:
- ptr = impl.PQnotifies(self.pgconn_ptr)
+ ptr = impl.PQnotifies(self._pgconn_ptr)
if ptr:
c = ptr.contents
return PGnotify(c.relname, c.be_pid, c.extra)
# TODO: should be done without copy
if not isinstance(buffer, bytes):
buffer = bytes(buffer)
- rv = impl.PQputCopyData(self.pgconn_ptr, buffer, len(buffer))
+ rv = impl.PQputCopyData(self._pgconn_ptr, buffer, len(buffer))
if rv < 0:
raise e.OperationalError(
f"sending copy data failed: {error_message(self)}"
return rv
def put_copy_end(self, error: Optional[bytes] = None) -> int:
- rv = impl.PQputCopyEnd(self.pgconn_ptr, error)
+ rv = impl.PQputCopyEnd(self._pgconn_ptr, error)
if rv < 0:
raise e.OperationalError(
f"sending copy end failed: {error_message(self)}"
def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
buffer_ptr = c_char_p()
- nbytes = impl.PQgetCopyData(self.pgconn_ptr, byref(buffer_ptr), async_)
+ nbytes = impl.PQgetCopyData(
+ self._pgconn_ptr, byref(buffer_ptr), async_
+ )
if nbytes == -2:
raise e.OperationalError(
f"receiving copy data failed: {error_message(self)}"
return nbytes, memoryview(b"")
def make_empty_result(self, exec_status: int) -> "PGresult":
- rv = impl.PQmakeEmptyPGresult(self.pgconn_ptr, exec_status)
+ rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
if not rv:
raise MemoryError("couldn't allocate empty PGresult")
return PGresult(rv)
"""
Call one of the pgconn libpq functions returning a bytes pointer.
"""
- if not self.pgconn_ptr:
+ if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
- rv = func(self.pgconn_ptr)
+ rv = func(self._pgconn_ptr)
assert rv is not None
return rv
"""
Call one of the pgconn libpq functions returning an int.
"""
- if not self.pgconn_ptr:
+ if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
- return func(self.pgconn_ptr)
+ return func(self._pgconn_ptr)
def _call_bool(self, func: Callable[[impl.PGconn_struct], int]) -> bool:
"""
Call one of the pgconn libpq functions returning a logical value.
"""
- if not self.pgconn_ptr:
+ if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
- return bool(func(self.pgconn_ptr))
+ return bool(func(self._pgconn_ptr))
def _ensure_pgconn(self) -> None:
- if not self.pgconn_ptr:
+ if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
"""
__module__ = "psycopg3.pq"
- __slots__ = ("pgresult_ptr",)
+ __slots__ = ("_pgresult_ptr",)
def __init__(self, pgresult_ptr: impl.PGresult_struct):
- self.pgresult_ptr: Optional[impl.PGresult_struct] = pgresult_ptr
+ self._pgresult_ptr: Optional[impl.PGresult_struct] = pgresult_ptr
def __del__(self) -> None:
self.clear()
return f"<{cls} [{status.name}] at 0x{id(self):x}>"
def clear(self) -> None:
- self.pgresult_ptr, p = None, self.pgresult_ptr
+ self._pgresult_ptr, p = None, self._pgresult_ptr
if p:
impl.PQclear(p)
+ @property
+ def pgresult_ptr(self) -> Optional[int]:
+ """The pointer to the underlying ``PGresult`` structure, as integer.
+
+ `!None` if the result waas cleared.
+
+ The value can be used to pass the structure to libpq functions which
+ psycopg3 doesn't (currently) wrap, either in C or in Python using FFI
+ libraries such as `ctypes`.
+ """
+ if self._pgresult_ptr is None:
+ return None
+
+ return addressof(self._pgresult_ptr.contents) # type: ignore[attr-defined]
+
@property
def status(self) -> int:
- return impl.PQresultStatus(self.pgresult_ptr)
+ return impl.PQresultStatus(self._pgresult_ptr)
@property
def error_message(self) -> bytes:
- return impl.PQresultErrorMessage(self.pgresult_ptr)
+ return impl.PQresultErrorMessage(self._pgresult_ptr)
def error_field(self, fieldcode: int) -> Optional[bytes]:
- return impl.PQresultErrorField(self.pgresult_ptr, fieldcode)
+ return impl.PQresultErrorField(self._pgresult_ptr, fieldcode)
@property
def ntuples(self) -> int:
- return impl.PQntuples(self.pgresult_ptr)
+ return impl.PQntuples(self._pgresult_ptr)
@property
def nfields(self) -> int:
- return impl.PQnfields(self.pgresult_ptr)
+ return impl.PQnfields(self._pgresult_ptr)
def fname(self, column_number: int) -> Optional[bytes]:
- return impl.PQfname(self.pgresult_ptr, column_number)
+ return impl.PQfname(self._pgresult_ptr, column_number)
def ftable(self, column_number: int) -> int:
- return impl.PQftable(self.pgresult_ptr, column_number)
+ return impl.PQftable(self._pgresult_ptr, column_number)
def ftablecol(self, column_number: int) -> int:
- return impl.PQftablecol(self.pgresult_ptr, column_number)
+ return impl.PQftablecol(self._pgresult_ptr, column_number)
def fformat(self, column_number: int) -> int:
- return impl.PQfformat(self.pgresult_ptr, column_number)
+ return impl.PQfformat(self._pgresult_ptr, column_number)
def ftype(self, column_number: int) -> int:
- return impl.PQftype(self.pgresult_ptr, column_number)
+ return impl.PQftype(self._pgresult_ptr, column_number)
def fmod(self, column_number: int) -> int:
- return impl.PQfmod(self.pgresult_ptr, column_number)
+ return impl.PQfmod(self._pgresult_ptr, column_number)
def fsize(self, column_number: int) -> int:
- return impl.PQfsize(self.pgresult_ptr, column_number)
+ return impl.PQfsize(self._pgresult_ptr, column_number)
@property
def binary_tuples(self) -> int:
- return impl.PQbinaryTuples(self.pgresult_ptr)
+ return impl.PQbinaryTuples(self._pgresult_ptr)
def get_value(
self, row_number: int, column_number: int
) -> Optional[bytes]:
length: int = impl.PQgetlength(
- self.pgresult_ptr, row_number, column_number
+ self._pgresult_ptr, row_number, column_number
)
if length:
- v = impl.PQgetvalue(self.pgresult_ptr, row_number, column_number)
+ v = impl.PQgetvalue(self._pgresult_ptr, row_number, column_number)
return string_at(v, length)
else:
- if impl.PQgetisnull(self.pgresult_ptr, row_number, column_number):
+ if impl.PQgetisnull(self._pgresult_ptr, row_number, column_number):
return None
else:
return b""
@property
def nparams(self) -> int:
- return impl.PQnparams(self.pgresult_ptr)
+ return impl.PQnparams(self._pgresult_ptr)
def param_type(self, param_number: int) -> int:
- return impl.PQparamtype(self.pgresult_ptr, param_number)
+ return impl.PQparamtype(self._pgresult_ptr, param_number)
@property
def command_status(self) -> Optional[bytes]:
- return impl.PQcmdStatus(self.pgresult_ptr)
+ return impl.PQcmdStatus(self._pgresult_ptr)
@property
def command_tuples(self) -> Optional[int]:
- rv = impl.PQcmdTuples(self.pgresult_ptr)
+ rv = impl.PQcmdTuples(self._pgresult_ptr)
return int(rv) if rv else None
@property
def oid_value(self) -> int:
- return impl.PQoidValue(self.pgresult_ptr)
+ return impl.PQoidValue(self._pgresult_ptr)
def set_attributes(self, descriptions: List[PGresAttDesc]) -> None:
structs = [
for desc in descriptions
]
array = (impl.PGresAttDesc_struct * len(structs))(*structs) # type: ignore
- rv = impl.PQsetResultAttrs(self.pgresult_ptr, len(structs), array)
+ rv = impl.PQsetResultAttrs(self._pgresult_ptr, len(structs), array)
if rv == 0:
raise e.OperationalError("PQsetResultAttrs failed")
# TODO: might be done without copy (however C does that)
if not isinstance(data, bytes):
data = bytes(data)
- out = impl.PQescapeLiteral(self.conn.pgconn_ptr, data, len(data))
+ out = impl.PQescapeLiteral(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
f"escape_literal failed: {error_message(self.conn)} bytes"
if not isinstance(data, bytes):
data = bytes(data)
- out = impl.PQescapeIdentifier(self.conn.pgconn_ptr, data, len(data))
+ out = impl.PQescapeIdentifier(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
f"escape_identifier failed: {error_message(self.conn)} bytes"
error = c_int()
out = create_string_buffer(len(data) * 2 + 1)
impl.PQescapeStringConn(
- self.conn.pgconn_ptr,
+ self.conn._pgconn_ptr,
pointer(out), # type: ignore
data,
len(data),
if self.conn:
self.conn._ensure_pgconn()
out = impl.PQescapeByteaConn(
- self.conn.pgconn_ptr,
+ self.conn._pgconn_ptr,
data,
len(data),
pointer(t_cast(c_ulong, len_out)),