From: Daniele Varrazzo Date: Mon, 9 May 2022 00:52:39 +0000 (+0200) Subject: fix: fetch results on Pipeline.sync(). X-Git-Tag: 3.1~113^2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=773944be133e0269b60031f3645664c1f2f1cf0b;p=thirdparty%2Fpsycopg.git fix: fetch results on Pipeline.sync(). This allows a sync() call to establish an high level synchronization point between the application and the server. If there were errors in the processing so far, sync will throw the exception. See #296. --- diff --git a/docs/advanced/pipeline.rst b/docs/advanced/pipeline.rst index b1a0b0e33..e9fee5057 100644 --- a/docs/advanced/pipeline.rst +++ b/docs/advanced/pipeline.rst @@ -257,10 +257,13 @@ For example, the following block: >>> with psycopg.connect(autocommit=True) as conn: ... with conn.pipeline() as p, conn.cursor() as cur: - ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"]) - ... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"]) - ... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"]) - ... p.sync() + ... try: + ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"]) + ... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"]) + ... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"]) + ... p.sync() + ... except psycopg.errors.UndefinedTable: + ... pass ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"]) fails with the error ``relation "no_such_table" does not exist`` and, at the diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 80e47acb8..7aab4f2f7 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -87,13 +87,15 @@ class BasePipeline: def _sync_gen(self) -> PQGen[None]: self._enqueue_sync() yield from self._communicate_gen() + yield from self._fetch_gen(flush=False) def _exit_gen(self) -> PQGen[None]: """Exit current pipeline by sending a Sync and, unless within a nested pipeline, also fetch back all remaining results. """ try: - yield from self._sync_gen() + self._enqueue_sync() + yield from self._communicate_gen() finally: if self.level == 1: # No need to force flush since we emitted a sync just before. diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index e5c7d37ee..e40fac3c5 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -201,6 +201,22 @@ def test_pipeline_commit_aborted(conn): conn.commit() +def test_sync_syncs_results(conn): + with conn.pipeline() as p: + cur = conn.execute("select 1") + assert cur.statusmessage is None + p.sync() + assert cur.statusmessage == "SELECT 1" + + +def test_sync_syncs_errors(conn): + conn.autocommit = True + with conn.pipeline() as p: + conn.execute("select 1 from nosuchtable") + with pytest.raises(e.UndefinedTable): + p.sync() + + def test_fetch_no_result(conn): with conn.pipeline(): cur = conn.cursor() diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index af7f5bb08..3bbb98579 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -204,6 +204,22 @@ async def test_pipeline_commit_aborted(aconn): await aconn.commit() +async def test_sync_syncs_results(aconn): + async with aconn.pipeline() as p: + cur = await aconn.execute("select 1") + assert cur.statusmessage is None + await p.sync() + assert cur.statusmessage == "SELECT 1" + + +async def test_sync_syncs_errors(aconn): + await aconn.set_autocommit(True) + async with aconn.pipeline() as p: + await aconn.execute("select 1 from nosuchtable") + with pytest.raises(e.UndefinedTable): + await p.sync() + + async def test_fetch_no_result(aconn): async with aconn.pipeline(): cur = aconn.cursor()