import ctypes.util
from ctypes import Structure, CFUNCTYPE, POINTER
from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
-from typing import List, Optional, Tuple
+from typing import Any, List, Optional, Tuple
from .misc import find_libpq_full_path
from ..errors import NotSupportedError
f" {libpq_version} available instead"
)
- return _PQhostaddr(pgconn)
+ rv: bytes = _PQhostaddr(pgconn)
+ return rv
PQport = pq.PQport
"PQclosePrepared requires libpq from PostgreSQL 17,"
f" {libpq_version} available instead"
)
- return _PQclosePrepared(pgconn, name)
+ rv: int = _PQclosePrepared(pgconn, name)
+ return rv
def PQclosePortal(pgconn: PGconn_struct, name: str) -> int:
"PQclosePortal requires libpq from PostgreSQL 17,"
f" {libpq_version} available instead"
)
- return _PQclosePortal(pgconn, name)
+ rv: int = _PQclosePortal(pgconn, name)
+ return rv
PQresultStatus = pq.PQresultStatus
"PQsendClosePrepared requires libpq from PostgreSQL 17,"
f" {libpq_version} available instead"
)
- return _PQsendClosePrepared(pgconn, name)
+ rv: int = _PQsendClosePrepared(pgconn, name)
+ return rv
def PQsendClosePortal(pgconn: PGconn_struct, name: str) -> int:
"PQsendClosePortal requires libpq from PostgreSQL 17,"
f" {libpq_version} available instead"
)
- return _PQsendClosePortal(pgconn, name)
+ rv: int = _PQsendClosePortal(pgconn, name)
+ return rv
PQgetResult = pq.PQgetResult
f" {libpq_version} available instead"
)
- return _PQencryptPasswordConn(pgconn, passwd, user, algorithm)
+ rv: Optional[bytes] = _PQencryptPasswordConn(pgconn, passwd, user, algorithm)
+ return rv
PQmakeEmptyPGresult = pq.PQmakeEmptyPGresult
"PQpipelineStatus requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
- return _PQpipelineStatus(pgconn)
+ rv: int = _PQpipelineStatus(pgconn)
+ return rv
def PQenterPipelineMode(pgconn: PGconn_struct) -> int:
"PQenterPipelineMode requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
- return _PQenterPipelineMode(pgconn)
+ rv: int = _PQenterPipelineMode(pgconn)
+ return rv
def PQexitPipelineMode(pgconn: PGconn_struct) -> int:
"PQexitPipelineMode requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
- return _PQexitPipelineMode(pgconn)
+ rv: int = _PQexitPipelineMode(pgconn)
+ return rv
def PQpipelineSync(pgconn: PGconn_struct) -> int:
"PQpipelineSync requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
- return _PQpipelineSync(pgconn)
+ rv: int = _PQpipelineSync(pgconn)
+ return rv
def PQsendFlushRequest(pgconn: PGconn_struct) -> int:
"PQsendFlushRequest requires libpq from PostgreSQL 14,"
f" {libpq_version} available instead"
)
- return _PQsendFlushRequest(pgconn)
+ rv: int = _PQsendFlushRequest(pgconn)
+ return rv
# 33.18. SSL Support
def generate_stub() -> None:
import re
- from ctypes import _CFuncPtr # type: ignore
+ from ctypes import _CFuncPtr # type: ignore[attr-defined]
- def type2str(fname, narg, t):
+ def type2str(fname: str, narg: int | None, t: Any) -> str:
if t is None:
return "None"
elif t is c_void_p:
if narg is not None:
return f"Optional[{t.__name__[3:]}]"
else:
- return t.__name__[3:]
+ return str(t.__name__[3:])
elif t.__name__ in ("LP_PQconninfoOption_struct",):
return f"Sequence[{t.__name__[3:]}]"