import ctypes
import ctypes.util
-from ctypes import Structure, POINTER
+from ctypes import Structure, CFUNCTYPE, POINTER
from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
from typing import List, Tuple
PQmakeEmptyPGresult.restype = PGresult_ptr
+# 33.12. Notice Processing
+
+PQnoticeReceiver = CFUNCTYPE(None, c_void_p, PGresult_ptr)
+
+PQsetNoticeReceiver = pq.PQsetNoticeReceiver
+PQsetNoticeReceiver.argtypes = [PGconn_ptr, PQnoticeReceiver, c_void_p]
+PQsetNoticeReceiver.restype = PQnoticeReceiver
+
+
def generate_stub() -> None:
import re
from ctypes import _CFuncPtr
# Copyright (C) 2020 The Psycopg Team
-from typing import Any, Optional, Sequence, NewType
+from typing import Any, Callable, Optional, Sequence, NewType
from ctypes import Array, pointer
from ctypes import c_char, c_char_p, c_int, c_ubyte, c_uint, c_ulong
arg6: Optional[Array[c_int]],
arg7: int,
) -> int: ...
+def PQsetNoticeReceiver(
+ arg1: PGconn_struct, arg2: Callable[[Any], PGresult_struct], arg3: Any
+) -> Callable[[Any], PGresult_struct]: ...
# fmt: off
# autogenerated: start
PGresult *PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
int PQlibVersion()
+ # 33.12. Notice Processing
+ ctypedef void (*PQnoticeReceiver)(void *arg, const PGresult *res)
+ PQnoticeReceiver PQsetNoticeReceiver(
+ PGconn *conn, PQnoticeReceiver prog, void *arg)
# Copyright (C) 2020 The Psycopg Team
+import logging
+from weakref import ref
+
from ctypes import Array, pointer, string_at
from ctypes import c_char_p, c_int, c_size_t, c_ulong
from typing import Any, Callable, List, Optional, Sequence
__impl__ = "ctypes"
+logger = logging.getLogger("psycopg3")
+
def version() -> int:
return impl.PQlibVersion()
class PGconn:
- __slots__ = ("pgconn_ptr",)
+ __slots__ = (
+ "pgconn_ptr",
+ "notice_callback",
+ "_notice_receiver",
+ "__weakref__",
+ )
def __init__(self, pgconn_ptr: impl.PGconn_struct):
self.pgconn_ptr: Optional[impl.PGconn_struct] = pgconn_ptr
+ self.notice_callback: Optional[Callable[..., None]] = None
+
+ w = ref(self)
+
+ @impl.PQnoticeReceiver # type: ignore
+ def notice_receiver(
+ arg: Any, result_ptr: impl.PGresult_struct
+ ) -> None:
+ pgconn = w()
+ if pgconn is None or pgconn.notice_callback is None:
+ return
+
+ res = PGresult(result_ptr)
+ try:
+ pgconn.notice_callback(res)
+ except Exception as e:
+ logger.exception("error in notice receiver: %s", e)
+
+ res.pgresult_ptr = None # avoid destroying the pgresult_ptr
+
+ impl.PQsetNoticeReceiver(pgconn_ptr, notice_receiver, None)
+ self._notice_receiver = notice_receiver
def __del__(self) -> None:
self.finish()
@staticmethod
cdef PGconn _from_ptr(impl.PGconn *ptr)
+ cdef public object notice_callback
+
cdef int _ensure_pgconn(self) except 0
cdef char *_call_bytes(self, conn_bytes_f func) except NULL
cdef int _call_int(self, conn_int_f func) except -1
from cpython.mem cimport PyMem_Malloc, PyMem_Free
+import logging
from typing import List, Optional, Sequence
from psycopg3.pq cimport libpq as impl
__impl__ = 'c'
+logger = logging.getLogger('psycopg3')
+
def version():
return impl.PQlibVersion()
+cdef void notice_receiver(void *arg, const impl.PGresult *res_ptr):
+ cdef PGconn pgconn = <object>arg
+ if pgconn.notice_callback is None:
+ return
+
+ cdef PGresult res = PGresult._from_ptr(<impl.PGresult *>res_ptr)
+ try:
+ pgconn.notice_callback(res)
+ except Exception as e:
+ logger.exception("error in notice receiver: %s", e)
+
+ res.pgresult_ptr = NULL # avoid destroying the pgresult_ptr
+
+
cdef class PGconn:
@staticmethod
cdef PGconn _from_ptr(impl.PGconn *ptr):
cdef PGconn rv = PGconn.__new__(PGconn)
rv.pgconn_ptr = ptr
+
+ impl.PQsetNoticeReceiver(ptr, notice_receiver, <void *>rv)
return rv
def __cinit__(self):
rv = out[:len_out]
impl.PQfreemem(out)
return rv
-
# Copyright (C) 2020 The Psycopg Team
-from typing import Any, List, Optional, Sequence, TYPE_CHECKING
+from typing import Any, Callable, List, Optional, Sequence, TYPE_CHECKING
from typing_extensions import Protocol
from .enums import (
class PGconn(Protocol):
+
+ notice_callback: Optional[Callable[["PGresult"], None]]
+
@classmethod
def connect(cls, conninfo: bytes) -> "PGconn":
...
import os
+import logging
from select import select
import pytest
import psycopg3
+import psycopg3.generators
def test_connectdb(pq, dsn):
res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
assert res.status == pq.ExecStatus.FATAL_ERROR
assert res.error_message == b""
+
+
+def test_notice_nohandler(pq, pgconn):
+ res = pgconn.exec_(
+ b"""
+do $$
+begin
+ raise notice 'hello notice';
+end
+$$ language plpgsql
+ """
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK
+
+
+def test_notice(pq, pgconn):
+ msgs = []
+
+ def callback(res):
+ assert res.status == pq.ExecStatus.NONFATAL_ERROR
+ msgs.append(res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY))
+
+ pgconn.notice_callback = callback
+ res = pgconn.exec_(
+ b"""
+do $$
+begin
+ raise notice 'hello notice';
+end
+$$ language plpgsql
+ """
+ )
+
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert msgs and msgs[0] == b"hello notice"
+
+
+def test_notice_error(pq, pgconn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg3")
+
+ def callback(res):
+ raise Exception("hello error")
+
+ pgconn.notice_callback = callback
+ res = pgconn.exec_(
+ b"""
+do $$
+begin
+ raise notice 'hello notice';
+end
+$$ language plpgsql
+ """
+ )
+
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ assert len(caplog.records) == 1
+ rec = caplog.records[0]
+ assert rec.levelno == logging.ERROR
+ assert "hello error" in rec.message