PQdescribePortal.argtypes = [PGconn_ptr, c_char_p]
PQdescribePortal.restype = PGresult_ptr
+_PQclosePrepared = None
+_PQclosePortal = None
+
+if libpq_version >= 170000:
+ _PQclosePrepared = pq.PQclosePrepared
+ _PQclosePrepared.argtypes = [PGconn_ptr, c_char_p]
+ _PQclosePrepared.restype = PGresult_ptr
+
+ _PQclosePortal = pq.PQclosePortal
+ _PQclosePortal.argtypes = [PGconn_ptr, c_char_p]
+ _PQclosePortal.restype = PGresult_ptr
+
+
+def PQclosePrepared(pgconn: PGconn_struct) -> int:
+ if not _PQclosePrepared:
+ raise NotSupportedError(
+ "PQclosePrepared requires libpq from PostgreSQL 17,"
+ f" {libpq_version} available instead"
+ )
+ return _PQclosePrepared(pgconn)
+
+
+def PQclosePortal(pgconn: PGconn_struct) -> int:
+ if not _PQclosePortal:
+ raise NotSupportedError(
+ "PQclosePortal requires libpq from PostgreSQL 17,"
+ f" {libpq_version} available instead"
+ )
+ return _PQclosePortal(pgconn)
+
+
PQresultStatus = pq.PQresultStatus
PQresultStatus.argtypes = [PGresult_ptr]
PQresultStatus.restype = c_int
PQsendDescribePortal.argtypes = [PGconn_ptr, c_char_p]
PQsendDescribePortal.restype = c_int
+_PQsendClosePrepared = None
+_PQsendClosePortal = None
+
+if libpq_version >= 170000:
+ _PQsendClosePrepared = pq.PQsendClosePrepared
+ _PQsendClosePrepared.argtypes = [PGconn_ptr, c_char_p]
+ _PQsendClosePrepared.restype = c_int
+
+ _PQsendClosePortal = pq.PQsendClosePortal
+ _PQsendClosePortal.argtypes = [PGconn_ptr, c_char_p]
+ _PQsendClosePortal.restype = c_int
+
+
+def PQsendClosePrepared(pgconn: PGconn_struct) -> int:
+ if not _PQsendClosePrepared:
+ raise NotSupportedError(
+ "PQsendClosePrepared requires libpq from PostgreSQL 17,"
+ f" {libpq_version} available instead"
+ )
+ return _PQsendClosePrepared(pgconn)
+
+
+def PQsendClosePortal(pgconn: PGconn_struct) -> int:
+ if not _PQsendClosePortal:
+ raise NotSupportedError(
+ "PQsendClosePortal requires libpq from PostgreSQL 17,"
+ f" {libpq_version} available instead"
+ )
+ return _PQsendClosePortal(pgconn)
+
+
PQgetResult = pq.PQgetResult
PQgetResult.argtypes = [PGconn_ptr]
PQgetResult.restype = PGresult_ptr
def PQexecParams(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: _Pointer[c_uint], arg5: _Pointer[c_char_p], arg6: _Pointer[c_int], arg7: _Pointer[c_int], arg8: int) -> PGresult_struct: ...
def PQdescribePrepared(arg1: Optional[PGconn_struct], arg2: bytes) -> PGresult_struct: ...
def PQdescribePortal(arg1: Optional[PGconn_struct], arg2: bytes) -> PGresult_struct: ...
+def PQclosePrepared(arg1: Optional[PGconn_struct], arg2: bytes) -> PGresult_struct: ...
+def PQclosePortal(arg1: Optional[PGconn_struct], arg2: bytes) -> PGresult_struct: ...
def PQresultStatus(arg1: Optional[PGresult_struct]) -> int: ...
def PQresultErrorField(arg1: Optional[PGresult_struct], arg2: int) -> Optional[bytes]: ...
def PQclear(arg1: Optional[PGresult_struct]) -> None: ...
def PQsendQueryParams(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: _Pointer[c_uint], arg5: _Pointer[c_char_p], arg6: _Pointer[c_int], arg7: _Pointer[c_int], arg8: int) -> int: ...
def PQsendDescribePrepared(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
def PQsendDescribePortal(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
+def PQsendClosePrepared(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
+def PQsendClosePortal(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
def PQgetResult(arg1: Optional[PGconn_struct]) -> PGresult_struct: ...
def PQconsumeInput(arg1: Optional[PGconn_struct]) -> int: ...
def PQisBusy(arg1: Optional[PGconn_struct]) -> int: ...
def send_describe_portal(self, name: bytes) -> None:
...
+ def close_prepared(self, name: bytes) -> "PGresult":
+ ...
+
+ def send_close_prepared(self, name: bytes) -> None:
+ ...
+
+ def close_portal(self, name: bytes) -> "PGresult":
+ ...
+
+ def send_close_portal(self, name: bytes) -> None:
+ ...
+
def get_result(self) -> Optional["PGresult"]:
...
f"sending describe portal failed: {error_message(self)}"
)
+ def close_prepared(self, name: bytes) -> "PGresult":
+ if not isinstance(name, bytes):
+ raise TypeError(f"'name' must be bytes, got {type(name)} instead")
+ self._ensure_pgconn()
+ rv = impl.PQclosePrepared(self._pgconn_ptr, name)
+ if not rv:
+ raise e.OperationalError(f"close prepared failed: {error_message(self)}")
+ return PGresult(rv)
+
+ def send_close_prepared(self, name: bytes) -> None:
+ if not isinstance(name, bytes):
+ raise TypeError(f"bytes expected, got {type(name)} instead")
+ self._ensure_pgconn()
+ if not impl.PQsendClosePrepared(self._pgconn_ptr, name):
+ raise e.OperationalError(
+ f"sending close prepared failed: {error_message(self)}"
+ )
+
+ def close_portal(self, name: bytes) -> "PGresult":
+ if not isinstance(name, bytes):
+ raise TypeError(f"'name' must be bytes, got {type(name)} instead")
+ self._ensure_pgconn()
+ rv = impl.PQclosePortal(self._pgconn_ptr, name)
+ if not rv:
+ raise e.OperationalError(f"close portal failed: {error_message(self)}")
+ return PGresult(rv)
+
+ def send_close_portal(self, name: bytes) -> None:
+ if not isinstance(name, bytes):
+ raise TypeError(f"bytes expected, got {type(name)} instead")
+ self._ensure_pgconn()
+ if not impl.PQsendClosePortal(self._pgconn_ptr, name):
+ raise e.OperationalError(
+ f"sending close portal failed: {error_message(self)}"
+ )
+
def get_result(self) -> Optional["PGresult"]:
rv = impl.PQgetResult(self._pgconn_ptr)
return PGresult(rv) if rv else None
int resultFormat) nogil
PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName) nogil
PGresult *PQdescribePortal(PGconn *conn, const char *portalName) nogil
+ PGresult *PQclosePrepared(PGconn *conn, const char *stmtName) nogil
+ PGresult *PQclosePortal(PGconn *conn, const char *portalName) nogil
ExecStatusType PQresultStatus(const PGresult *res) nogil
# PQresStatus: not needed, we have pretty enums
char *PQresultErrorMessage(const PGresult *res) nogil
int resultFormat) nogil
int PQsendDescribePrepared(PGconn *conn, const char *stmtName) nogil
int PQsendDescribePortal(PGconn *conn, const char *portalName) nogil
+ int PQsendClosePrepared(PGconn *conn, const char *stmtName) nogil
+ int PQsendClosePortal(PGconn *conn, const char *portalName) nogil
PGresult *PQgetResult(PGconn *conn) nogil
int PQconsumeInput(PGconn *conn) nogil
int PQisBusy(PGconn *conn) nogil
#define PQsendFlushRequest(conn) 0
#define PQsetTraceFlags(conn, stream) do {} while (0)
#endif
+
+#if PG_VERSION_NUM < 170000
+#define PQclosePrepared(conn, name) NULL
+#define PQclosePortal(conn, name) NULL
+#define PQsendClosePrepared(conn, name) 0
+#define PQsendClosePortal(conn, name) 0
+#endif
"""
f"sending describe prepared failed: {error_message(self)}"
)
+ def close_prepared(self, const char *name) -> PGresult:
+ if libpq.PG_VERSION_NUM < 170000:
+ raise e.NotSupportedError(
+ f"PQclosePrepared requires libpq from PostgreSQL 17,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ _ensure_pgconn(self)
+ cdef libpq.PGresult *rv = libpq.PQclosePrepared(self._pgconn_ptr, name)
+ if rv is NULL:
+ raise e.OperationalError(
+ f"close prepared failed: {error_message(self)}"
+ )
+ return PGresult._from_ptr(rv)
+
+ def send_close_prepared(self, const char *name) -> None:
+ if libpq.PG_VERSION_NUM < 170000:
+ raise e.NotSupportedError(
+ f"PQsendClosePrepared requires libpq from PostgreSQL 17,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ _ensure_pgconn(self)
+ cdef int rv = libpq.PQsendClosePrepared(self._pgconn_ptr, name)
+ if not rv:
+ raise e.OperationalError(
+ f"sending close prepared failed: {error_message(self)}"
+ )
+
+ def close_portal(self, const char *name) -> PGresult:
+ if libpq.PG_VERSION_NUM < 170000:
+ raise e.NotSupportedError(
+ f"PQclosePortal requires libpq from PostgreSQL 17,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ _ensure_pgconn(self)
+ cdef libpq.PGresult *rv = libpq.PQclosePortal(self._pgconn_ptr, name)
+ if rv is NULL:
+ raise e.OperationalError(
+ f"close prepared failed: {error_message(self)}"
+ )
+ return PGresult._from_ptr(rv)
+
+ def send_close_portal(self, const char *name) -> None:
+ if libpq.PG_VERSION_NUM < 170000:
+ raise e.NotSupportedError(
+ f"PQsendClosePortal requires libpq from PostgreSQL 17,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ _ensure_pgconn(self)
+ cdef int rv = libpq.PQsendClosePortal(self._pgconn_ptr, name)
+ if not rv:
+ raise e.OperationalError(
+ f"sending close prepared failed: {error_message(self)}"
+ )
+
def get_result(self) -> Optional["PGresult"]:
cdef libpq.PGresult *pgresult = libpq.PQgetResult(self._pgconn_ptr)
if pgresult is NULL: