]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: don't use PQsendQuery in pipeline mode
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 8 Jun 2022 14:25:28 +0000 (16:25 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 8 Jun 2022 14:30:43 +0000 (16:30 +0200)
There is no reason to use it: the only reason to not use
PQsendQueryParams is to send multiple statements, but this is not
possible in pipeline mode anyway.

Using PQsendQuery seems to produce a spurious Close message, which the
libpq is surprised to receive. Issue reported to pgsql-bugs.

https://www.postgresql.org/message-id/CA%2Bmi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw%40mail.gmail.com

Close #314

psycopg/psycopg/client_cursor.py
psycopg/psycopg/cursor.py
tests/test_client_cursor.py
tests/test_client_cursor_async.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index bdaa4e127c3159ef83129f382600042e95202b81..2a1ce3078c7602bbf3747bab7330da48aea25360 100644 (file)
@@ -58,22 +58,19 @@ class ClientCursorMixin(BaseCursor[ConnectionType, Row]):
             )
 
         self._query = query
-        if no_pqexec:
-            if self._conn._pipeline:
-                self._conn._pipeline.command_queue.append(
-                    partial(self._pgconn.send_query_params, None)
-                )
-            else:
-                self._pgconn.send_query_params(query.query, None)
+
+        if self._conn._pipeline:
+            # In pipeline mode always use PQsendQueryParams - see #314
+            # Multiple statements in the same query are not allowed anyway.
+            self._conn._pipeline.command_queue.append(
+                partial(self._pgconn.send_query_params, query.query, None)
+            )
+        elif no_pqexec:
+            self._pgconn.send_query_params(query.query, None)
         else:
             # if we don't have to, let's use exec_ as it can run more than
             # one query in one go
-            if self._conn._pipeline:
-                self._conn._pipeline.command_queue.append(
-                    partial(self._pgconn.send_query, query.query)
-                )
-            else:
-                self._pgconn.send_query(query.query)
+            self._pgconn.send_query(query.query)
 
     def _convert_query(
         self, query: Query, params: Optional[Params] = None
index a2777e470026d4c5da9ebbf5e204545fd08a749b..4a0de6261ff914aeefd7e11dddc16a19c89ade95 100644 (file)
@@ -448,35 +448,32 @@ class BaseCursor(Generic[ConnectionType, Row]):
             fmt = BINARY if binary else TEXT
 
         self._query = query
-        if query.params or no_pqexec or fmt == BINARY:
-            if self._conn._pipeline:
-                self._conn._pipeline.command_queue.append(
-                    partial(
-                        self._pgconn.send_query_params,
-                        query.query,
-                        query.params,
-                        param_formats=query.formats,
-                        param_types=query.types,
-                        result_format=fmt,
-                    )
-                )
-            else:
-                self._pgconn.send_query_params(
+
+        if self._conn._pipeline:
+            # In pipeline mode always use PQsendQueryParams - see #314
+            # Multiple statements in the same query are not allowed anyway.
+            self._conn._pipeline.command_queue.append(
+                partial(
+                    self._pgconn.send_query_params,
                     query.query,
                     query.params,
                     param_formats=query.formats,
                     param_types=query.types,
                     result_format=fmt,
                 )
+            )
+        elif no_pqexec or query.params or fmt == BINARY:
+            self._pgconn.send_query_params(
+                query.query,
+                query.params,
+                param_formats=query.formats,
+                param_types=query.types,
+                result_format=fmt,
+            )
         else:
             # if we don't have to, let's use exec_ as it can run more than
             # one query in one go
-            if self._conn._pipeline:
-                self._conn._pipeline.command_queue.append(
-                    partial(self._pgconn.send_query, query.query)
-                )
-            else:
-                self._pgconn.send_query(query.query)
+            self._pgconn.send_query(query.query)
 
     def _convert_query(
         self, query: Query, params: Optional[Params] = None
index dca40221d9929ccf7a5b5412463485aba24c9aa5..e42993b7183fed1c0d09c923a98413457b470469 100644 (file)
@@ -807,3 +807,18 @@ def test_mogrify(conn):
     conn.execute("set client_encoding to latin1")
     with pytest.raises(UnicodeEncodeError):
         cur.mogrify("select %(s)s", {"s": "\u20ac"})
+
+
+@pytest.mark.libpq(">= 14")
+@pytest.mark.pipeline
+def test_message_0x33(conn):
+    # https://github.com/psycopg/psycopg/issues/314
+    notices = []
+    conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+    conn.autocommit = True
+    with conn.pipeline():
+        cur = conn.execute("select 'test'")
+        cur.fetchone() == ("test",)
+
+    assert not notices
index a4730886f7dd4efa68fcc00471c03f89881cc4dc..7bfd5666d18d7e00dbe36db1927b4b93e29e7746 100644 (file)
@@ -681,3 +681,18 @@ async def test_mogrify(aconn):
     await aconn.execute("set client_encoding to latin1")
     with pytest.raises(UnicodeEncodeError):
         cur.mogrify("select %(s)s", {"s": "\u20ac"})
+
+
+@pytest.mark.libpq(">= 14")
+@pytest.mark.pipeline
+async def test_message_0x33(aconn):
+    # https://github.com/psycopg/psycopg/issues/314
+    notices = []
+    aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+    await aconn.set_autocommit(True)
+    async with aconn.pipeline():
+        cur = await aconn.execute("select 'test'")
+        await cur.fetchone() == ("test",)
+
+    assert not notices
index dd890f8496d8cf361c55a06cc0337d1b12b3ae53..9da08c761f2f50d2355e3644dfc642346f124778 100644 (file)
@@ -488,6 +488,7 @@ def test_rollback_transaction(conn):
 
 
 def test_message_0x33(conn):
+    # https://github.com/psycopg/psycopg/issues/314
     notices = []
     conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
 
index ef929b3b1fc0b69f6cdb7be6132f40d33309bab1..2dd25b6ea2d3c192550eb3399a02d7426e961e7c 100644 (file)
@@ -490,6 +490,7 @@ async def test_rollback_transaction(aconn):
 
 
 async def test_message_0x33(aconn):
+    # https://github.com/psycopg/psycopg/issues/314
     notices = []
     aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))