PQinitOpenSSL.restype = None
+# 32.20. OAuth support
+
+
+class PGpromptOAuthDevice_struct(Structure):
+ _fields_ = [
+ ("verification_uri", c_char_p),
+ ("user_code", c_char_p),
+ ("verification_uri_complete", c_char_p),
+ ("expires_in", c_int),
+ ]
+
+
+class PGoauthBearerRequest_struct(Structure):
+ # Struct forward declaration to allow to define the callbacks
+ pass
+
+
+PGpromptOAuthDevice_ptr = POINTER(PGpromptOAuthDevice_struct)
+PGoauthBearerRequest_ptr = POINTER(PGoauthBearerRequest_struct)
+
+
+# TODO: not sure - uintptr_t on win32 in libpq source
+SOCKTYPE = c_void_p if sys.platform == "win32" else c_int
+
+PGoauthBearerRequestAsyncCB = CFUNCTYPE(
+ c_int, PGconn_ptr, PGoauthBearerRequest_ptr, POINTER(SOCKTYPE)
+)
+PGoauthBearerRequestCleanupCB = CFUNCTYPE(
+ None, PGconn_ptr, POINTER(PGoauthBearerRequest_struct)
+)
+
+PGoauthBearerRequest_struct._fields_ = [
+ ("openid_configuration", c_char_p),
+ ("scope", c_char_p),
+ ("async", PGoauthBearerRequestAsyncCB),
+ ("cleanup", PGoauthBearerRequestCleanupCB),
+ ("token", c_char_p),
+ ("user", c_void_p),
+]
+
+PQauthDataHook_type = CFUNCTYPE(c_int, c_int, PGconn_ptr, c_void_p)
+
+if libpq_version >= 180000:
+ PQsetAuthDataHook = pq.PQsetAuthDataHook
+ PQsetAuthDataHook.argtypes = [PQauthDataHook_type]
+ PQsetAuthDataHook.restype = None
+
+ PQgetAuthDataHook = pq.PQgetAuthDataHook
+ PQgetAuthDataHook.argtypes = []
+ PQgetAuthDataHook.restype = PQauthDataHook_type
+
+ PQdefaultAuthDataHook = pq.PQdefaultAuthDataHook
+ PQdefaultAuthDataHook.argtypes = [c_int, PGconn_ptr, c_void_p]
+ PQdefaultAuthDataHook.restype = c_int
+else:
+ PQsetAuthDataHook = not_supported_before("PQsetAuthDataHook", 180000)
+ PQgetAuthDataHook = not_supported_before("PQgetAuthDataHook", 180000)
+ PQdefaultAuthDataHook = not_supported_before("PQdefaultAuthDataHook", 180000)
+
+
def generate_stub() -> None:
import re
from ctypes import _CFuncPtr # type: ignore[attr-defined]
def PQexitPipelineMode(pgconn: PGconn_struct | None) -> int: ...
def PQpipelineSync(pgconn: PGconn_struct | None) -> int: ...
def PQsendFlushRequest(pgconn: PGconn_struct | None) -> int: ...
+def PQsetAuthDataHook(
+ hook: Callable[[int, PGconn_struct | None, Any], int],
+) -> None: ...
+def PQgetAuthDataHook() -> Callable[[int, PGconn_struct | None, Any], int]: ...
+def PQdefaultAuthDataHook(
+ type: int, pgconn: PGconn_struct | None, data: Any
+) -> int: ...
# Autogenerated section.
# In order to refresh, run: