]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Waiting implementation exposed as connection method
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 10:43:27 +0000 (23:43 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 12:25:24 +0000 (01:25 +1300)
psycopg3/connection.py
tests/pq/test_async.py

index c93d14f46cdc29fa9735e3d6cbb780b7c72cf11e..12fbfcbe7b953d0c8394b9594c8ab58801427d06 100644 (file)
@@ -117,7 +117,7 @@ class Connection(BaseConnection):
             raise NotImplementedError()
         conninfo = make_conninfo(conninfo, **kwargs)
         gen = cls._connect_gen(conninfo)
-        pgconn = wait_select(gen)
+        pgconn = cls.wait(gen)
         return cls(pgconn)
 
     def commit(self):
@@ -133,13 +133,17 @@ class Connection(BaseConnection):
                 return
 
             self.pgconn.send_query(command)
-            (pgres,) = wait_select(self._exec_gen(self.pgconn))
+            (pgres,) = self.wait(self._exec_gen(self.pgconn))
             if pgres.status != pq.ExecStatus.COMMAND_OK:
                 raise exc.OperationalError(
                     f"error on {command.decode('utf8')}:"
                     f" {pq.error_message(pgres)}"
                 )
 
+    @classmethod
+    def wait(cls, gen):
+        return wait_select(gen)
+
 
 class AsyncConnection(BaseConnection):
     """
@@ -153,7 +157,7 @@ class AsyncConnection(BaseConnection):
     async def connect(cls, conninfo, **kwargs):
         conninfo = make_conninfo(conninfo, **kwargs)
         gen = cls._connect_gen(conninfo)
-        pgconn = await wait_async(gen)
+        pgconn = await cls.wait(gen)
         return cls(pgconn)
 
     async def commit(self):
@@ -169,9 +173,13 @@ class AsyncConnection(BaseConnection):
                 return
 
             self.pgconn.send_query(command)
-            (pgres,) = await wait_async(self._exec_gen(self.pgconn))
+            (pgres,) = await self.wait(self._exec_gen(self.pgconn))
             if pgres.status != pq.ExecStatus.COMMAND_OK:
                 raise exc.OperationalError(
                     f"error on {command.decode('utf8')}:"
                     f" {pq.error_message(pgres)}"
                 )
+
+    @classmethod
+    async def wait(cls, gen):
+        return await wait_async(gen)
index eb90f21192e6ce983bdb4286e12ad4cf45c4c83e..2f998552b61d00edd5c5b5573faac0f6bf9e82fc 100644 (file)
@@ -1,7 +1,5 @@
 from select import select
 
-from psycopg3.waiting import wait_select
-
 
 def test_send_query(pq, pgconn):
     # This test shows how to process an async query in all its glory
@@ -60,7 +58,7 @@ def test_send_query_compact_test(pq, conn):
         b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
         % (b"x" * 1_000_000)
     )
-    results = wait_select(conn._exec_gen(conn.pgconn))
+    results = conn.wait(conn._exec_gen(conn.pgconn))
 
     assert len(results) == 2
     assert results[0].nfields == 1
@@ -73,6 +71,6 @@ def test_send_query_compact_test(pq, conn):
 
 def test_send_query_params(pq, conn):
     res = conn.pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
-    (res,) = wait_select(conn._exec_gen(conn.pgconn))
+    (res,) = conn.wait(conn._exec_gen(conn.pgconn))
     assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"