From: Daniele Varrazzo Date: Mon, 16 Mar 2020 08:12:38 +0000 (+1300) Subject: Added async basic functions and test for it X-Git-Tag: 3.0.dev0~703 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=9251c698d22d86b9971c8a38891eb32d492cdc37;p=thirdparty%2Fpsycopg.git Added async basic functions and test for it --- diff --git a/psycopg3/pq/_pq_ctypes.py b/psycopg3/pq/_pq_ctypes.py index ebd320590..98b3b7442 100644 --- a/psycopg3/pq/_pq_ctypes.py +++ b/psycopg3/pq/_pq_ctypes.py @@ -315,6 +315,40 @@ PQparamtype.restype = Oid # PQprint: pretty useless +# 33.4. Asynchronous Command Processing + +PQsendQuery = pq.PQsendQuery +PQsendQuery.argtypes = [PGconn_ptr, c_char_p] +PQsendQuery.restype = c_int + +# TODO: PQsendQueryParams PQsendPrepare PQsendQueryPrepared +# PQsendDescribePrepared PQsendDescribePortal + +PQgetResult = pq.PQgetResult +PQgetResult.argtypes = [PGconn_ptr] +PQgetResult.restype = PGresult_ptr + +PQconsumeInput = pq.PQconsumeInput +PQconsumeInput.argtypes = [PGconn_ptr] +PQconsumeInput.restype = c_int + +PQisBusy = pq.PQisBusy +PQisBusy.argtypes = [PGconn_ptr] +PQisBusy.restype = c_int + +PQsetnonblocking = pq.PQsetnonblocking +PQsetnonblocking.argtypes = [PGconn_ptr, c_int] +PQsetnonblocking.restype = c_int + +PQisnonblocking = pq.PQisnonblocking +PQisnonblocking.argtypes = [PGconn_ptr] +PQisnonblocking.restype = c_int + +PQflush = pq.PQflush +PQflush.argtypes = [PGconn_ptr] +PQflush.restype == c_int + + # 33.11. Miscellaneous Functions PQfreemem = pq.PQfreemem diff --git a/psycopg3/pq/pq_ctypes.py b/psycopg3/pq/pq_ctypes.py index 27f32f418..7502f25b8 100644 --- a/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/pq/pq_ctypes.py @@ -183,6 +183,13 @@ class PGconn: raise MemoryError("couldn't allocate PGresult") return PGresult(rv) + def send_query(self, command): + if not isinstance(command, bytes): + raise TypeError( + "bytes expected, got %s instead" % type(command).__name__ + ) + return impl.PQsendQuery(self.pgconn_ptr, command) + def exec_params( self, command, @@ -325,6 +332,25 @@ class PGconn: raise MemoryError("couldn't allocate PGresult") return PGresult(rv) + def get_result(self): + rv = impl.PQgetResult(self.pgconn_ptr) + return PGresult(rv) if rv else None + + def consume_input(self): + return impl.PQconsumeInput(self.pgconn_ptr) + + def is_busy(self): + return impl.PQisBusy(self.pgconn_ptr) + + def is_non_blocking(self): + return impl.PQisnonblocking(self.pgconn_ptr) + + def set_non_blocking(self, arg): + return impl.PQsetnonblocking(self.pgconn_ptr, arg) + + def flush(self): + return impl.PQflush(self.pgconn_ptr) + class PGresult: __slots__ = ("pgresult_ptr",) diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py new file mode 100644 index 000000000..be2971c98 --- /dev/null +++ b/tests/pq/test_async.py @@ -0,0 +1,53 @@ +from select import select + + +def test_send_query(pq, pgconn): + # This test shows how to process an async query in all its glory + pgconn.set_non_blocking(1) + + # Long query to make sure we have to wait on send + pgconn.send_query( + b"/* %s */ select pg_sleep(0.01); select 1 as foo;" + % (b"x" * 1_000_000) + ) + + # send loop + waited_on_send = 0 + while 1: + f = pgconn.flush() + assert f != -1 + if f == 0: + break + + waited_on_send += 1 + + rl, wl, xl = select([pgconn.socket], [pgconn.socket], []) + assert not (rl and wl) + if wl: + continue # call flush again() + if rl: + assert pgconn.consume_input() == 1, pgconn.error_message + continue + + assert waited_on_send + + # read loop + results = [] + while 1: + assert pgconn.consume_input() == 1, pgconn.error_message + if pgconn.is_busy(): + select([pgconn.socket], [], []) + continue + res = pgconn.get_result() + if res is None: + break + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK + results.append(res) + + assert len(results) == 2 + assert results[0].nfields == 1 + assert results[0].fname(0) == b"pg_sleep" + assert results[0].get_value(0, 0) == b"" + assert results[1].nfields == 1 + assert results[1].fname(0) == b"foo" + assert results[1].get_value(0, 0) == b"1"