From e507918489aad84df730b54b7da53b45b4550ba3 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Wed, 24 Aug 2022 00:39:40 +0200 Subject: [PATCH] fix: avoid all uses of PQconn.send_query in pipeline mode We stopped using it in normal querying because, until libpq 14.4, the libpq complained about "message type 0x33 arrived from server while idle" (see #314). But usage remained for internal queries such as BEGIN. In libpq 14.5, Postgres devs tried to fix the issue... and broke PQsendQuery altogether :( Hopefully the issue will be solved in future libpq versions, but we can work around it completely. Close #350. --- psycopg/psycopg/_pipeline.py | 4 ---- psycopg/psycopg/connection.py | 20 +++++++------------- tests/pq/test_pq.py | 15 --------------- tests/scripts/pipeline-demo.py | 5 +++-- tests/test_generators.py | 11 ++++++----- 5 files changed, 16 insertions(+), 39 deletions(-) diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 6b941bf28..023de014c 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -83,10 +83,6 @@ class BasePipeline: " v14 or greater required for pipeline mode" ) - # Bug #350 - if pq.version() == 140005: - return f"pipeline mode broken in libpq version {pq.version()}" - return "" def _enter_gen(self) -> PQGen[None]: diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 6c11d17ec..222cd1688 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -447,23 +447,17 @@ class BaseConnection(Generic[Row]): command = command.as_bytes(self) if self._pipeline: - if result_format == TEXT: - cmd = partial(self.pgconn.send_query, command) - else: - cmd = partial( - self.pgconn.send_query_params, - command, - None, - result_format=result_format, - ) + cmd = partial( + self.pgconn.send_query_params, + command, + None, + result_format=result_format, + ) self._pipeline.command_queue.append(cmd) self._pipeline.result_queue.append(None) return None - if result_format == TEXT: - self.pgconn.send_query(command) - else: - self.pgconn.send_query_params(command, None, result_format=result_format) + self.pgconn.send_query_params(command, None, result_format=result_format) result = (yield from execute(self.pgconn))[-1] if result.status != COMMAND_OK and result.status != TUPLES_OK: diff --git a/tests/pq/test_pq.py b/tests/pq/test_pq.py index a37fff333..076c3b62b 100644 --- a/tests/pq/test_pq.py +++ b/tests/pq/test_pq.py @@ -37,7 +37,6 @@ def test_want_import_version(): @pytest.mark.libpq(">= 14") -@pytest.mark.libpq("!= 14.5") def test_pipeline_supported(conn): assert psycopg.Pipeline.is_supported() assert psycopg.AsyncPipeline.is_supported() @@ -56,17 +55,3 @@ def test_pipeline_not_supported(conn): pass assert "too old" in str(exc.value) - - -@pytest.mark.libpq("14.5") -def test_pipeline_not_supported_14_5(conn): - # Affected by #350 - # NOTE: we might support it in binary using a patched libpq version. - assert not psycopg.Pipeline.is_supported() - assert not psycopg.AsyncPipeline.is_supported() - - with pytest.raises(psycopg.NotSupportedError) as exc: - with conn.pipeline(): - pass - - assert "broken" in str(exc.value) diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py index 9035911e3..ec952293a 100644 --- a/tests/scripts/pipeline-demo.py +++ b/tests/scripts/pipeline-demo.py @@ -60,6 +60,7 @@ class LoggingPGconn: return getattr(self._pgconn, name) def send_query(self, command: bytes) -> None: + self._logger.warning("PQsendQuery broken in libpq 14.5") self._pgconn.send_query(command) self._logger.info("sent %s", command.decode()) @@ -140,7 +141,7 @@ def prepare_pipeline_demo_pq( if qname == "prepare": pgconn.send_prepare(qname.encode(), query.encode()) else: - pgconn.send_query(query.encode()) + pgconn.send_query_params(query.encode(), None) results_queue.append(qname) committed = False @@ -155,7 +156,7 @@ def prepare_pipeline_demo_pq( elif not committed: committed = True - commands.append(partial(pgconn.send_query, b"COMMIT")) + commands.append(partial(pgconn.send_query_params, b"COMMIT", None)) results_queue.append("commit") elif not synced: diff --git a/tests/test_generators.py b/tests/test_generators.py index c396d0425..8aba73f6d 100644 --- a/tests/test_generators.py +++ b/tests/test_generators.py @@ -41,9 +41,9 @@ def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses): def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators): commands = deque( [ - partial(pgconn.send_query, b"select 1"), + partial(pgconn.send_query_params, b"select 1", None), pgconn.pipeline_sync, - partial(pgconn.send_query, b"select 2"), + partial(pgconn.send_query_params, b"select 2", None), pgconn.pipeline_sync, ] ) @@ -60,7 +60,8 @@ def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators): def test_pipeline_communicate_no_sync(pgconn, pipeline, generators): numqueries = 10 commands = deque( - [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries + [partial(pgconn.send_query_params, b"select repeat('xyzxz', 12)", None)] + * numqueries + [pgconn.send_flush_request] ) expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries @@ -89,7 +90,7 @@ def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators) commands = deque( [ partial(pgconn.send_query_params, insert_sql, [b"1"]), - partial(pgconn.send_query, b"select no_such_function(1)"), + partial(pgconn.send_query_params, b"select no_such_function(1)", None), partial(pgconn.send_query_params, insert_sql, [b"2"]), pgconn.pipeline_sync, partial(pgconn.send_query_params, insert_sql, [b"3"]), @@ -141,7 +142,7 @@ def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, gene partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]), partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]), partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]), - partial(pgconn.send_query, b"commit"), + partial(pgconn.send_query_params, b"commit", None), ] ) expected_statuses = [ -- 2.47.2