f"sending query and params failed: {error_message(self)}"
)
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None:
+ atypes: Optional[Array[impl.Oid]]
+ if param_types is None:
+ nparams = 0
+ atypes = None
+ else:
+ nparams = len(param_types)
+ atypes = (impl.Oid * nparams)(*param_types)
+
+ self._ensure_pgconn()
+ if not impl.PQsendPrepare(
+ self.pgconn_ptr, name, command, nparams, atypes
+ ):
+ raise PQerror(
+ f"sending query and params failed: {error_message(self)}"
+ )
+
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Sequence[Optional[bytes]],
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None:
+ # repurpose this function with a cheeky replacement of query with name,
+ # drop the param_types from the result
+ args = self._query_params_args(
+ name, param_values, None, param_formats, result_format
+ )
+ args = args[:3] + args[4:]
+
+ self._ensure_pgconn()
+ if not impl.PQsendQueryPrepared(*args):
+ raise PQerror(
+ f"sending prepared query failed: {error_message(self)}"
+ )
+
def _query_params_args(
self,
command: bytes,
def test_send_query_params(pq, pgconn):
- res = pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
+ pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
(res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
pgconn.finish()
with pytest.raises(psycopg3.OperationalError):
pgconn.send_query_params(b"select $1", [b"1"])
+
+
+def test_send_prepare(pq, pgconn):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.get_value(0, 0) == b"8"
+
+ pgconn.finish()
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
+ with pytest.raises(psycopg3.OperationalError):
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+
+
+def test_send_prepare_types(pq, pgconn):
+ pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"prep", [b"3", b"5"])
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_send_prepared_binary_in(pq, pgconn):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1])
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize(
+ "fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
+)
+def test_send_prepared_binary_out(pq, pgconn, fmt, out):
+ val = b"foo\00bar"
+ pgconn.send_prepare(b"", b"select $1::bytea")
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+ pgconn.send_query_prepared(
+ b"", [val], param_formats=[1], result_format=fmt
+ )
+ (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == out