PQsendQuery.argtypes = [PGconn_ptr, c_char_p]
PQsendQuery.restype = c_int
-# TODO: PQsendQueryParams PQsendPrepare PQsendQueryPrepared
+PQsendQueryParams = pq.PQsendQueryParams
+PQsendQueryParams.argtypes = [
+ PGconn_ptr,
+ c_char_p,
+ c_int,
+ POINTER(Oid),
+ POINTER(c_char_p),
+ POINTER(c_int),
+ POINTER(c_int),
+ c_int,
+]
+PQsendQueryParams.restype = c_int
+
+# TODO: PQsendPrepare PQsendQueryPrepared
# PQsendDescribePrepared PQsendDescribePortal
PQgetResult = pq.PQgetResult
param_types=None,
param_formats=None,
result_format=0,
+ ):
+ args = self._query_params_args(
+ command, param_values, param_types, param_formats, result_format
+ )
+ rv = impl.PQexecParams(*args)
+ if not rv:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult(rv)
+
+ def send_query_params(
+ self,
+ command,
+ param_values,
+ param_types=None,
+ param_formats=None,
+ result_format=0,
+ ):
+ args = self._query_params_args(
+ command, param_values, param_types, param_formats, result_format
+ )
+ if not impl.PQsendQueryParams(*args):
+ raise PQerror(
+ f"sending query and params failed: {error_message(self)}"
+ )
+
+ def _query_params_args(
+ self, command, param_values, param_types, param_formats, result_format,
):
if not isinstance(command, bytes):
raise TypeError(
)
aformats = (c_int * nparams)(*param_formats)
- rv = impl.PQexecParams(
+ return (
self.pgconn_ptr,
command,
nparams,
aformats,
result_format,
)
- if not rv:
- raise MemoryError("couldn't allocate PGresult")
- return PGresult(rv)
def prepare(self, name, command, param_types=None):
if not isinstance(name, bytes):
from select import select
+from psycopg3.waiting import wait_select
+
def test_send_query(pq, pgconn):
# This test shows how to process an async query in all its glory
assert results[1].nfields == 1
assert results[1].fname(0) == b"foo"
assert results[1].get_value(0, 0) == b"1"
+
+
+def test_send_query_compact_test(pq, conn):
+ # Like the above test but use psycopg3 facilities for compactness
+ conn.pgconn.send_query(
+ b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+ results = wait_select(conn._exec_gen(conn.pgconn))
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"pg_sleep"
+ assert results[0].get_value(0, 0) == b""
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"
+
+
+def test_send_query_params(pq, conn):
+ res = conn.pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
+ (res,) = wait_select(conn._exec_gen(conn.pgconn))
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == b"8"