]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: move common notifies processing code into a common function
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 2 Nov 2022 11:20:53 +0000 (12:20 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 4 Nov 2022 16:29:04 +0000 (17:29 +0100)
psycopg/psycopg/generators.py
psycopg_c/psycopg_c/_psycopg/generators.pyx

index 37cba131acf522ec425d7b3ffb2c3811264dcb5b..584fe47bf70e768ba4c4e673cb01a22766452dcd 100644 (file)
@@ -173,13 +173,7 @@ def _fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
                 break
             yield WAIT_R
 
-    # Consume notifies
-    while True:
-        n = pgconn.notifies()
-        if not n:
-            break
-        if pgconn.notify_handler:
-            pgconn.notify_handler(n)
+    _consume_notifies(pgconn)
 
     return pgconn.get_result()
 
@@ -199,12 +193,7 @@ def _pipeline_communicate(
 
         if ready & READY_R:
             pgconn.consume_input()
-            while True:
-                n = pgconn.notifies()
-                if not n:
-                    break
-                if pgconn.notify_handler:
-                    pgconn.notify_handler(n)
+            _consume_notifies(pgconn)
 
             res: List[PGresult] = []
             while not pgconn.is_busy():
@@ -229,6 +218,16 @@ def _pipeline_communicate(
     return results
 
 
+def _consume_notifies(pgconn: PGconn) -> None:
+    # Consume notifies
+    while True:
+        n = pgconn.notifies()
+        if not n:
+            break
+        if pgconn.notify_handler:
+            pgconn.notify_handler(n)
+
+
 def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]:
     yield WAIT_R
     pgconn.consume_input()
index e45314a13407032ee7a66b2d9413bbb7621943f3..67bdd13f4c329cacf64e5435a1635107a59accd0 100644 (file)
@@ -158,9 +158,7 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
     Return a result from the database (whether success or error).
     """
     cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
-    cdef libpq.PGnotify *notify
     cdef int cires, ibres = 0
-    cdef object notify_handler = pgconn.notify_handler
     cdef libpq.PGresult *pgres
 
     if libpq.PQisBusy(pgconn_ptr):
@@ -178,21 +176,7 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
                 break
             yield WAIT_R
 
-    # Consume notifies
-    if notify_handler is not None:
-        while True:
-            pynotify = pgconn.notifies()
-            if pynotify is None:
-                break
-            PyObject_CallFunctionObjArgs(
-                notify_handler, <PyObject *>pynotify, NULL
-            )
-    else:
-        while True:
-            notify = libpq.PQnotifies(pgconn_ptr)
-            if notify is NULL:
-                break
-            libpq.PQfreemem(notify)
+    _consume_notifies(pgconn)
 
     pgres = libpq.PQgetResult(pgconn_ptr)
     if pgres is NULL:
@@ -209,8 +193,6 @@ def pipeline_communicate(
     Return a list results, including single PIPELINE_SYNC elements.
     """
     cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
-    cdef object notify_handler = pgconn.notify_handler
-    cdef libpq.PGnotify *notify
     cdef int cires
     cdef int status
     cdef int ready
@@ -229,20 +211,7 @@ def pipeline_communicate(
                 raise e.OperationalError(
                     f"consuming input failed: {error_message(pgconn)}")
 
-            if notify_handler is not None:
-                while True:
-                    pynotify = pgconn.notifies()
-                    if pynotify is None:
-                        break
-                    PyObject_CallFunctionObjArgs(
-                        notify_handler, <PyObject *>pynotify, NULL
-                    )
-            else:
-                while True:
-                    notify = libpq.PQnotifies(pgconn_ptr)
-                    if notify is NULL:
-                        break
-                    libpq.PQfreemem(notify)
+            _consume_notifies(pgconn)
 
             res: List[PGresult] = []
             while not libpq.PQisBusy(pgconn_ptr):
@@ -270,3 +239,27 @@ def pipeline_communicate(
             commands.popleft()()
 
     return results
+
+
+cdef int _consume_notifies(pq.PGconn pgconn) except -1:
+    cdef object notify_handler = pgconn.notify_handler
+    cdef libpq.PGconn *pgconn_ptr
+    cdef libpq.PGnotify *notify
+
+    if notify_handler is not None:
+        while True:
+            pynotify = pgconn.notifies()
+            if pynotify is None:
+                break
+            PyObject_CallFunctionObjArgs(
+                notify_handler, <PyObject *>pynotify, NULL
+            )
+    else:
+        pgconn_ptr = pgconn._pgconn_ptr
+        while True:
+            notify = libpq.PQnotifies(pgconn_ptr)
+            if notify is NULL:
+                break
+            libpq.PQfreemem(notify)
+
+    return 0