]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: fetch results on Pipeline.sync().
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 9 May 2022 00:52:39 +0000 (02:52 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 9 May 2022 01:10:46 +0000 (03:10 +0200)
This allows a sync() call to establish an high level synchronization
point between the application and the server. If there were errors in
the processing so far, sync will throw the exception.

See #296.

docs/advanced/pipeline.rst
psycopg/psycopg/_pipeline.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index b1a0b0e33086e73eba85dd7387788d34dc44eb18..e9fee5057b484f58fa966b418dea082a3936b7a9 100644 (file)
@@ -257,10 +257,13 @@ For example, the following block:
 
     >>> with psycopg.connect(autocommit=True) as conn:
     ...     with conn.pipeline() as p, conn.cursor() as cur:
-    ...         cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"])
-    ...         cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"])
-    ...         conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"])
-    ...         p.sync()
+    ...         try:
+    ...             cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"])
+    ...             cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"])
+    ...             conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"])
+    ...             p.sync()
+    ...         except psycopg.errors.UndefinedTable:
+    ...             pass
     ...         cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"])
 
 fails with the error ``relation "no_such_table" does not exist`` and, at the
index 80e47acb87889371747befee739c442dc7d19fc5..7aab4f2f7a6da6bf8804754ba297b2e33cf2f466 100644 (file)
@@ -87,13 +87,15 @@ class BasePipeline:
     def _sync_gen(self) -> PQGen[None]:
         self._enqueue_sync()
         yield from self._communicate_gen()
+        yield from self._fetch_gen(flush=False)
 
     def _exit_gen(self) -> PQGen[None]:
         """Exit current pipeline by sending a Sync and, unless within a nested
         pipeline, also fetch back all remaining results.
         """
         try:
-            yield from self._sync_gen()
+            self._enqueue_sync()
+            yield from self._communicate_gen()
         finally:
             if self.level == 1:
                 # No need to force flush since we emitted a sync just before.
index e5c7d37ee5f20c821149b224dd431226c4b55a7e..e40fac3c574f036f6bc493e42c9ce80ea2562e41 100644 (file)
@@ -201,6 +201,22 @@ def test_pipeline_commit_aborted(conn):
             conn.commit()
 
 
+def test_sync_syncs_results(conn):
+    with conn.pipeline() as p:
+        cur = conn.execute("select 1")
+        assert cur.statusmessage is None
+        p.sync()
+        assert cur.statusmessage == "SELECT 1"
+
+
+def test_sync_syncs_errors(conn):
+    conn.autocommit = True
+    with conn.pipeline() as p:
+        conn.execute("select 1 from nosuchtable")
+        with pytest.raises(e.UndefinedTable):
+            p.sync()
+
+
 def test_fetch_no_result(conn):
     with conn.pipeline():
         cur = conn.cursor()
index af7f5bb082615efbe95f8c7afa4c80c44fa6a0a8..3bbb985791eab21503e888ffa5afdde3e301c1c2 100644 (file)
@@ -204,6 +204,22 @@ async def test_pipeline_commit_aborted(aconn):
             await aconn.commit()
 
 
+async def test_sync_syncs_results(aconn):
+    async with aconn.pipeline() as p:
+        cur = await aconn.execute("select 1")
+        assert cur.statusmessage is None
+        await p.sync()
+        assert cur.statusmessage == "SELECT 1"
+
+
+async def test_sync_syncs_errors(aconn):
+    await aconn.set_autocommit(True)
+    async with aconn.pipeline() as p:
+        await aconn.execute("select 1 from nosuchtable")
+        with pytest.raises(e.UndefinedTable):
+            await p.sync()
+
+
 async def test_fetch_no_result(aconn):
     async with aconn.pipeline():
         cur = aconn.cursor()