]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: avoid all uses of PQconn.send_query in pipeline mode
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 23 Aug 2022 22:39:40 +0000 (00:39 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 23 Aug 2022 23:20:11 +0000 (01:20 +0200)
We stopped using it in normal querying because, until libpq 14.4, the
libpq complained about "message type 0x33 arrived from server while
idle" (see #314). But usage remained for internal queries such as BEGIN.

In libpq 14.5, Postgres devs tried to fix the issue... and broke
PQsendQuery altogether :(

Hopefully the issue will be solved in future libpq versions, but we can
work around it completely.

Close #350.

psycopg/psycopg/_pipeline.py
psycopg/psycopg/connection.py
tests/pq/test_pq.py
tests/scripts/pipeline-demo.py
tests/test_generators.py

index 6b941bf2831222fab510a94d0674492d8ea2f50d..023de014c50ac8313be838a2235641442ac71e8b 100644 (file)
@@ -83,10 +83,6 @@ class BasePipeline:
                 " v14 or greater required for pipeline mode"
             )
 
-        # Bug #350
-        if pq.version() == 140005:
-            return f"pipeline mode broken in libpq version {pq.version()}"
-
         return ""
 
     def _enter_gen(self) -> PQGen[None]:
index 6c11d17ecaedb25df9869c0a1183bb34487e370a..222cd16883b15c025256eb9ce30bf015b3520b3c 100644 (file)
@@ -447,23 +447,17 @@ class BaseConnection(Generic[Row]):
             command = command.as_bytes(self)
 
         if self._pipeline:
-            if result_format == TEXT:
-                cmd = partial(self.pgconn.send_query, command)
-            else:
-                cmd = partial(
-                    self.pgconn.send_query_params,
-                    command,
-                    None,
-                    result_format=result_format,
-                )
+            cmd = partial(
+                self.pgconn.send_query_params,
+                command,
+                None,
+                result_format=result_format,
+            )
             self._pipeline.command_queue.append(cmd)
             self._pipeline.result_queue.append(None)
             return None
 
-        if result_format == TEXT:
-            self.pgconn.send_query(command)
-        else:
-            self.pgconn.send_query_params(command, None, result_format=result_format)
+        self.pgconn.send_query_params(command, None, result_format=result_format)
 
         result = (yield from execute(self.pgconn))[-1]
         if result.status != COMMAND_OK and result.status != TUPLES_OK:
index a37fff33345542736275c2552771564ff915d760..076c3b62b9cf42a84a6c7589c085d156334a9fbb 100644 (file)
@@ -37,7 +37,6 @@ def test_want_import_version():
 
 
 @pytest.mark.libpq(">= 14")
-@pytest.mark.libpq("!= 14.5")
 def test_pipeline_supported(conn):
     assert psycopg.Pipeline.is_supported()
     assert psycopg.AsyncPipeline.is_supported()
@@ -56,17 +55,3 @@ def test_pipeline_not_supported(conn):
             pass
 
     assert "too old" in str(exc.value)
-
-
-@pytest.mark.libpq("14.5")
-def test_pipeline_not_supported_14_5(conn):
-    # Affected by #350
-    # NOTE: we might support it in binary using a patched libpq version.
-    assert not psycopg.Pipeline.is_supported()
-    assert not psycopg.AsyncPipeline.is_supported()
-
-    with pytest.raises(psycopg.NotSupportedError) as exc:
-        with conn.pipeline():
-            pass
-
-    assert "broken" in str(exc.value)
index 9035911e3ac4a30130255a3023ccf91fd740cf38..ec952293a18d79209ab7cce649935f9d596201fc 100644 (file)
@@ -60,6 +60,7 @@ class LoggingPGconn:
         return getattr(self._pgconn, name)
 
     def send_query(self, command: bytes) -> None:
+        self._logger.warning("PQsendQuery broken in libpq 14.5")
         self._pgconn.send_query(command)
         self._logger.info("sent %s", command.decode())
 
@@ -140,7 +141,7 @@ def prepare_pipeline_demo_pq(
         if qname == "prepare":
             pgconn.send_prepare(qname.encode(), query.encode())
         else:
-            pgconn.send_query(query.encode())
+            pgconn.send_query_params(query.encode(), None)
         results_queue.append(qname)
 
     committed = False
@@ -155,7 +156,7 @@ def prepare_pipeline_demo_pq(
 
         elif not committed:
             committed = True
-            commands.append(partial(pgconn.send_query, b"COMMIT"))
+            commands.append(partial(pgconn.send_query_params, b"COMMIT", None))
             results_queue.append("commit")
 
         elif not synced:
index c396d0425a4a9f7eb7d379055133ee94b2772d01..8aba73f6ded435b5853cab20fc887e0d4eba2448 100644 (file)
@@ -41,9 +41,9 @@ def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses):
 def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
     commands = deque(
         [
-            partial(pgconn.send_query, b"select 1"),
+            partial(pgconn.send_query_params, b"select 1", None),
             pgconn.pipeline_sync,
-            partial(pgconn.send_query, b"select 2"),
+            partial(pgconn.send_query_params, b"select 2", None),
             pgconn.pipeline_sync,
         ]
     )
@@ -60,7 +60,8 @@ def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
 def test_pipeline_communicate_no_sync(pgconn, pipeline, generators):
     numqueries = 10
     commands = deque(
-        [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries
+        [partial(pgconn.send_query_params, b"select repeat('xyzxz', 12)", None)]
+        * numqueries
         + [pgconn.send_flush_request]
     )
     expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries
@@ -89,7 +90,7 @@ def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators)
     commands = deque(
         [
             partial(pgconn.send_query_params, insert_sql, [b"1"]),
-            partial(pgconn.send_query, b"select no_such_function(1)"),
+            partial(pgconn.send_query_params, b"select no_such_function(1)", None),
             partial(pgconn.send_query_params, insert_sql, [b"2"]),
             pgconn.pipeline_sync,
             partial(pgconn.send_query_params, insert_sql, [b"3"]),
@@ -141,7 +142,7 @@ def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, gene
             partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
             partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]),
             partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]),
-            partial(pgconn.send_query, b"commit"),
+            partial(pgconn.send_query_params, b"commit", None),
         ]
     )
     expected_statuses = [