From: Daniele Varrazzo Date: Wed, 2 Nov 2022 11:20:53 +0000 (+0100) Subject: refactor: move common notifies processing code into a common function X-Git-Tag: pool-3.1.4~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b4a0179faeb9cb4052ef70611f446066dffe308f;p=thirdparty%2Fpsycopg.git refactor: move common notifies processing code into a common function --- diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 37cba131a..584fe47bf 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -173,13 +173,7 @@ def _fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]: break yield WAIT_R - # Consume notifies - while True: - n = pgconn.notifies() - if not n: - break - if pgconn.notify_handler: - pgconn.notify_handler(n) + _consume_notifies(pgconn) return pgconn.get_result() @@ -199,12 +193,7 @@ def _pipeline_communicate( if ready & READY_R: pgconn.consume_input() - while True: - n = pgconn.notifies() - if not n: - break - if pgconn.notify_handler: - pgconn.notify_handler(n) + _consume_notifies(pgconn) res: List[PGresult] = [] while not pgconn.is_busy(): @@ -229,6 +218,16 @@ def _pipeline_communicate( return results +def _consume_notifies(pgconn: PGconn) -> None: + # Consume notifies + while True: + n = pgconn.notifies() + if not n: + break + if pgconn.notify_handler: + pgconn.notify_handler(n) + + def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]: yield WAIT_R pgconn.consume_input() diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index e45314a13..67bdd13f4 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -158,9 +158,7 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]: Return a result from the database (whether success or error). """ cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr - cdef libpq.PGnotify *notify cdef int cires, ibres = 0 - cdef object notify_handler = pgconn.notify_handler cdef libpq.PGresult *pgres if libpq.PQisBusy(pgconn_ptr): @@ -178,21 +176,7 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]: break yield WAIT_R - # Consume notifies - if notify_handler is not None: - while True: - pynotify = pgconn.notifies() - if pynotify is None: - break - PyObject_CallFunctionObjArgs( - notify_handler, pynotify, NULL - ) - else: - while True: - notify = libpq.PQnotifies(pgconn_ptr) - if notify is NULL: - break - libpq.PQfreemem(notify) + _consume_notifies(pgconn) pgres = libpq.PQgetResult(pgconn_ptr) if pgres is NULL: @@ -209,8 +193,6 @@ def pipeline_communicate( Return a list results, including single PIPELINE_SYNC elements. """ cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr - cdef object notify_handler = pgconn.notify_handler - cdef libpq.PGnotify *notify cdef int cires cdef int status cdef int ready @@ -229,20 +211,7 @@ def pipeline_communicate( raise e.OperationalError( f"consuming input failed: {error_message(pgconn)}") - if notify_handler is not None: - while True: - pynotify = pgconn.notifies() - if pynotify is None: - break - PyObject_CallFunctionObjArgs( - notify_handler, pynotify, NULL - ) - else: - while True: - notify = libpq.PQnotifies(pgconn_ptr) - if notify is NULL: - break - libpq.PQfreemem(notify) + _consume_notifies(pgconn) res: List[PGresult] = [] while not libpq.PQisBusy(pgconn_ptr): @@ -270,3 +239,27 @@ def pipeline_communicate( commands.popleft()() return results + + +cdef int _consume_notifies(pq.PGconn pgconn) except -1: + cdef object notify_handler = pgconn.notify_handler + cdef libpq.PGconn *pgconn_ptr + cdef libpq.PGnotify *notify + + if notify_handler is not None: + while True: + pynotify = pgconn.notifies() + if pynotify is None: + break + PyObject_CallFunctionObjArgs( + notify_handler, pynotify, NULL + ) + else: + pgconn_ptr = pgconn._pgconn_ptr + while True: + notify = libpq.PQnotifies(pgconn_ptr) + if notify is NULL: + break + libpq.PQfreemem(notify) + + return 0