]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added async basic functions and test for it
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 16 Mar 2020 08:12:38 +0000 (21:12 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 16 Mar 2020 09:06:05 +0000 (22:06 +1300)
psycopg3/pq/_pq_ctypes.py
psycopg3/pq/pq_ctypes.py
tests/pq/test_async.py [new file with mode: 0644]

index ebd32059010e1a14a3b6869776cf7acf9e1ab303..98b3b744290bf846767a8cfcf06c84034d5d5abd 100644 (file)
@@ -315,6 +315,40 @@ PQparamtype.restype = Oid
 # PQprint: pretty useless
 
 
+# 33.4. Asynchronous Command Processing
+
+PQsendQuery = pq.PQsendQuery
+PQsendQuery.argtypes = [PGconn_ptr, c_char_p]
+PQsendQuery.restype = c_int
+
+# TODO: PQsendQueryParams PQsendPrepare PQsendQueryPrepared
+#       PQsendDescribePrepared PQsendDescribePortal
+
+PQgetResult = pq.PQgetResult
+PQgetResult.argtypes = [PGconn_ptr]
+PQgetResult.restype = PGresult_ptr
+
+PQconsumeInput = pq.PQconsumeInput
+PQconsumeInput.argtypes = [PGconn_ptr]
+PQconsumeInput.restype = c_int
+
+PQisBusy = pq.PQisBusy
+PQisBusy.argtypes = [PGconn_ptr]
+PQisBusy.restype = c_int
+
+PQsetnonblocking = pq.PQsetnonblocking
+PQsetnonblocking.argtypes = [PGconn_ptr, c_int]
+PQsetnonblocking.restype = c_int
+
+PQisnonblocking = pq.PQisnonblocking
+PQisnonblocking.argtypes = [PGconn_ptr]
+PQisnonblocking.restype = c_int
+
+PQflush = pq.PQflush
+PQflush.argtypes = [PGconn_ptr]
+PQflush.restype == c_int
+
+
 # 33.11. Miscellaneous Functions
 
 PQfreemem = pq.PQfreemem
index 27f32f41829f77974c7526081886b3168dafb64c..7502f25b88cdab44a30c567716e115ca4e9d005c 100644 (file)
@@ -183,6 +183,13 @@ class PGconn:
             raise MemoryError("couldn't allocate PGresult")
         return PGresult(rv)
 
+    def send_query(self, command):
+        if not isinstance(command, bytes):
+            raise TypeError(
+                "bytes expected, got %s instead" % type(command).__name__
+            )
+        return impl.PQsendQuery(self.pgconn_ptr, command)
+
     def exec_params(
         self,
         command,
@@ -325,6 +332,25 @@ class PGconn:
             raise MemoryError("couldn't allocate PGresult")
         return PGresult(rv)
 
+    def get_result(self):
+        rv = impl.PQgetResult(self.pgconn_ptr)
+        return PGresult(rv) if rv else None
+
+    def consume_input(self):
+        return impl.PQconsumeInput(self.pgconn_ptr)
+
+    def is_busy(self):
+        return impl.PQisBusy(self.pgconn_ptr)
+
+    def is_non_blocking(self):
+        return impl.PQisnonblocking(self.pgconn_ptr)
+
+    def set_non_blocking(self, arg):
+        return impl.PQsetnonblocking(self.pgconn_ptr, arg)
+
+    def flush(self):
+        return impl.PQflush(self.pgconn_ptr)
+
 
 class PGresult:
     __slots__ = ("pgresult_ptr",)
diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py
new file mode 100644 (file)
index 0000000..be2971c
--- /dev/null
@@ -0,0 +1,53 @@
+from select import select
+
+
+def test_send_query(pq, pgconn):
+    # This test shows how to process an async query in all its glory
+    pgconn.set_non_blocking(1)
+
+    # Long query to make sure we have to wait on send
+    pgconn.send_query(
+        b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
+        % (b"x" * 1_000_000)
+    )
+
+    # send loop
+    waited_on_send = 0
+    while 1:
+        f = pgconn.flush()
+        assert f != -1
+        if f == 0:
+            break
+
+        waited_on_send += 1
+
+        rl, wl, xl = select([pgconn.socket], [pgconn.socket], [])
+        assert not (rl and wl)
+        if wl:
+            continue  # call flush again()
+        if rl:
+            assert pgconn.consume_input() == 1, pgconn.error_message
+            continue
+
+    assert waited_on_send
+
+    # read loop
+    results = []
+    while 1:
+        assert pgconn.consume_input() == 1, pgconn.error_message
+        if pgconn.is_busy():
+            select([pgconn.socket], [], [])
+            continue
+        res = pgconn.get_result()
+        if res is None:
+            break
+        assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+        results.append(res)
+
+    assert len(results) == 2
+    assert results[0].nfields == 1
+    assert results[0].fname(0) == b"pg_sleep"
+    assert results[0].get_value(0, 0) == b""
+    assert results[1].nfields == 1
+    assert results[1].fname(0) == b"foo"
+    assert results[1].get_value(0, 0) == b"1"