]
+class PGnotify_struct(Structure):
+ _fields_ = [
+ ("relname", c_char_p),
+ ("be_pid", c_int),
+ ("extra", c_char_p),
+ ]
+
+
PGconn_ptr = POINTER(PGconn_struct)
PGresult_ptr = POINTER(PGresult_struct)
PQconninfoOption_ptr = POINTER(PQconninfoOption_struct)
+PGnotify_ptr = POINTER(PGnotify_struct)
# Function definitions as explained in PostgreSQL 12 documentation
PQflush.restype == c_int
+# 33.8. Asynchronous Notification
+
+PQnotifies = pq.PQnotifies
+PQnotifies.argtypes = [PGconn_ptr]
+PQnotifies.restype = PGnotify_ptr
+
+
# 33.11. Miscellaneous Functions
PQfreemem = pq.PQfreemem
dispchar: bytes
dispsize: int
+class PGnotify_struct:
+ be_pid: int
+ relname: bytes
+ extra: bytes
+
def PQhostaddr(arg1: Optional[PGconn_struct]) -> bytes: ...
def PQerrorMessage(arg1: Optional[PGconn_struct]) -> bytes: ...
def PQresultErrorMessage(arg1: Optional[PGresult_struct]) -> bytes: ...
arg1: PGconn_struct, arg2: Callable[[Any], PGresult_struct], arg3: Any
) -> Callable[[Any], PGresult_struct]: ...
+# TODO: Ignoring type as getting an error on mypy/ctypes:
+# Type argument "psycopg3.pq._pq_ctypes.PGnotify_struct" of "pointer" must be
+# a subtype of "ctypes._CData"
+def PQnotifies(
+ arg1: Optional[PGconn_struct],
+) -> Optional[pointer[PGnotify_struct]]: ... # type: ignore
+
# fmt: off
# autogenerated: start
def PQlibVersion() -> int: ...
ctypedef struct PGresult:
pass
+ ctypedef struct PGnotify:
+ char *relname
+ int be_pid
+ char *extra
+
ctypedef struct PQconninfoOption:
char *keyword
char *envvar
int PQisnonblocking(const PGconn *conn)
int PQflush(PGconn *conn)
+ # 33.8. Asynchronous Notification
+ PGnotify *PQnotifies(PGconn *conn)
+
# 33.11. Miscellaneous Functions
void PQfreemem(void *ptr)
void PQconninfoFree(PQconninfoOption *connOptions)
# Copyright (C) 2020 The Psycopg Team
-from collections import namedtuple
-from typing import cast, Union
+from typing import cast, NamedTuple, Optional, Union
from ..errors import OperationalError
from .enums import DiagnosticField
pass
-ConninfoOption = namedtuple(
- "ConninfoOption", "keyword envvar compiled val label dispchar dispsize"
-)
+class PGnotify(NamedTuple):
+ relname: bytes
+ be_pid: int
+ extra: bytes
+
+
+class ConninfoOption(NamedTuple):
+ keyword: bytes
+ envvar: Optional[bytes]
+ compiled: Optional[bytes]
+ val: Optional[bytes]
+ label: bytes
+ dispchar: bytes
+ dispsize: int
def error_message(obj: Union[PGconn, PGresult]) -> str:
DiagnosticField,
Format,
)
-from .misc import error_message, ConninfoOption, PQerror
+from .misc import error_message, PGnotify, ConninfoOption, PQerror
from . import _pq_ctypes as impl
if TYPE_CHECKING:
raise PQerror(f"flushing failed: {error_message(self)}")
return rv
+ def notifies(self) -> Optional["PGnotify"]:
+ ptr = impl.PQnotifies(self.pgconn_ptr)
+ if ptr:
+ c = ptr.contents
+ return PGnotify(c.relname, c.be_pid, c.extra)
+ impl.PQfreemem(ptr)
+ else:
+ return None
+
def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":
rv = impl.PQmakeEmptyPGresult(self.pgconn_ptr, exec_status)
if not rv:
from psycopg3.pq.libpq cimport Oid
from psycopg3.errors import OperationalError
-from psycopg3.pq.misc import error_message, ConninfoOption, PQerror
+from psycopg3.pq.misc import error_message, PGnotify, ConninfoOption, PQerror
from psycopg3.pq.enums import (
ConnStatus,
PollingStatus,
)
return rv
+ def notifies(self) -> Optional[PGnotify]:
+ cdef impl.PGnotify *ptr = impl.PQnotifies(self.pgconn_ptr)
+ if ptr:
+ ret = PGnotify(ptr.relname, ptr.be_pid, ptr.extra)
+ impl.PQfreemem(ptr)
+ return ret
+ else:
+ return None
+
def make_empty_result(self, exec_status: ExecStatus) -> PGresult:
cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult(
self.pgconn_ptr, exec_status)
assert res.error_message == b""
+def test_notify(pgconn):
+ assert pgconn.notifies() is None
+
+ pgconn.exec_(b"listen foo")
+ pgconn.exec_(b"listen bar")
+ pgconn.exec_(b"notify foo, '1'")
+ pgconn.exec_(b"notify bar, '2'")
+ pgconn.exec_(b"notify foo, '3'")
+
+ n = pgconn.notifies()
+ assert n.relname == b"foo"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"1"
+
+ n = pgconn.notifies()
+ assert n.relname == b"bar"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"2"
+
+ n = pgconn.notifies()
+ assert n.relname == b"foo"
+ assert n.be_pid == pgconn.backend_pid
+ assert n.extra == b"3"
+
+ assert pgconn.notifies() is None
+
+
def test_notice_nohandler(pgconn):
pgconn.exec_(b"set client_min_messages to notice")
res = pgconn.exec_(