" v14 or greater required for pipeline mode"
)
- # Bug #350
- if pq.version() == 140005:
- return f"pipeline mode broken in libpq version {pq.version()}"
-
return ""
def _enter_gen(self) -> PQGen[None]:
command = command.as_bytes(self)
if self._pipeline:
- if result_format == TEXT:
- cmd = partial(self.pgconn.send_query, command)
- else:
- cmd = partial(
- self.pgconn.send_query_params,
- command,
- None,
- result_format=result_format,
- )
+ cmd = partial(
+ self.pgconn.send_query_params,
+ command,
+ None,
+ result_format=result_format,
+ )
self._pipeline.command_queue.append(cmd)
self._pipeline.result_queue.append(None)
return None
- if result_format == TEXT:
- self.pgconn.send_query(command)
- else:
- self.pgconn.send_query_params(command, None, result_format=result_format)
+ self.pgconn.send_query_params(command, None, result_format=result_format)
result = (yield from execute(self.pgconn))[-1]
if result.status != COMMAND_OK and result.status != TUPLES_OK:
@pytest.mark.libpq(">= 14")
-@pytest.mark.libpq("!= 14.5")
def test_pipeline_supported(conn):
assert psycopg.Pipeline.is_supported()
assert psycopg.AsyncPipeline.is_supported()
pass
assert "too old" in str(exc.value)
-
-
-@pytest.mark.libpq("14.5")
-def test_pipeline_not_supported_14_5(conn):
- # Affected by #350
- # NOTE: we might support it in binary using a patched libpq version.
- assert not psycopg.Pipeline.is_supported()
- assert not psycopg.AsyncPipeline.is_supported()
-
- with pytest.raises(psycopg.NotSupportedError) as exc:
- with conn.pipeline():
- pass
-
- assert "broken" in str(exc.value)
return getattr(self._pgconn, name)
def send_query(self, command: bytes) -> None:
+ self._logger.warning("PQsendQuery broken in libpq 14.5")
self._pgconn.send_query(command)
self._logger.info("sent %s", command.decode())
if qname == "prepare":
pgconn.send_prepare(qname.encode(), query.encode())
else:
- pgconn.send_query(query.encode())
+ pgconn.send_query_params(query.encode(), None)
results_queue.append(qname)
committed = False
elif not committed:
committed = True
- commands.append(partial(pgconn.send_query, b"COMMIT"))
+ commands.append(partial(pgconn.send_query_params, b"COMMIT", None))
results_queue.append("commit")
elif not synced:
def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
commands = deque(
[
- partial(pgconn.send_query, b"select 1"),
+ partial(pgconn.send_query_params, b"select 1", None),
pgconn.pipeline_sync,
- partial(pgconn.send_query, b"select 2"),
+ partial(pgconn.send_query_params, b"select 2", None),
pgconn.pipeline_sync,
]
)
def test_pipeline_communicate_no_sync(pgconn, pipeline, generators):
numqueries = 10
commands = deque(
- [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries
+ [partial(pgconn.send_query_params, b"select repeat('xyzxz', 12)", None)]
+ * numqueries
+ [pgconn.send_flush_request]
)
expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries
commands = deque(
[
partial(pgconn.send_query_params, insert_sql, [b"1"]),
- partial(pgconn.send_query, b"select no_such_function(1)"),
+ partial(pgconn.send_query_params, b"select no_such_function(1)", None),
partial(pgconn.send_query_params, insert_sql, [b"2"]),
pgconn.pipeline_sync,
partial(pgconn.send_query_params, insert_sql, [b"3"]),
partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]),
partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]),
- partial(pgconn.send_query, b"commit"),
+ partial(pgconn.send_query_params, b"commit", None),
]
)
expected_statuses = [