Can send and return binary data too.
import ctypes
import ctypes.util
from ctypes import Structure, POINTER
-from ctypes import c_char_p, c_int, c_void_p
+from ctypes import c_char, c_char_p, c_int, c_uint, c_void_p
pq = ctypes.pydll.LoadLibrary(ctypes.util.find_library("pq"))
# libpq data types
+Oid = c_uint
+
+
class PGconn_struct(Structure):
_fields_ = []
PQexec.argtypes = [PGconn_ptr, c_char_p]
PQexec.restype = PGresult_ptr
+PQexecParams = pq.PQexecParams
+PQexecParams.argtypes = [
+ PGconn_ptr,
+ c_char_p,
+ c_int,
+ POINTER(Oid),
+ POINTER(c_char_p),
+ POINTER(c_int),
+ POINTER(c_int),
+ c_int,
+]
+PQexecParams.restype = PGresult_ptr
+
PQresultStatus = pq.PQresultStatus
PQresultStatus.argtypes = [PGresult_ptr]
PQresultStatus.restype = c_int
+# PQresStatus: not needed, we have pretty enums
+
+PQresultErrorMessage = pq.PQresultErrorMessage
+PQresultErrorMessage.argtypes = [PGresult_ptr]
+PQresultErrorMessage.restype = c_char_p
+
+# TODO: PQresultVerboseErrorMessage
+
+PQresultErrorField = pq.PQresultErrorField
+PQresultErrorField.argtypes = [PGresult_ptr, c_int]
+PQresultErrorField.restype = c_char_p
+
PQclear = pq.PQclear
PQclear.argtypes = [PGresult_ptr]
PQclear.restype = None
+# 33.3.2. Retrieving Query Result Information
+
+PQntuples = pq.PQntuples
+PQntuples.argtypes = [PGresult_ptr]
+PQntuples.restype = c_int
+
+PQnfields = pq.PQnfields
+PQnfields.argtypes = [PGresult_ptr]
+PQnfields.restype = c_int
+
+PQfname = pq.PQfname
+PQfname.argtypes = [PGresult_ptr, c_int]
+PQfname.restype = c_char_p
+
+# PQfnumber: useless and hard to use
+
+PQftable = pq.PQftable
+PQftable.argtypes = [PGresult_ptr, c_int]
+PQftable.restype = Oid
+
+PQftablecol = pq.PQftablecol
+PQftablecol.argtypes = [PGresult_ptr, c_int]
+PQftablecol.restype = c_int
+
+PQfformat = pq.PQfformat
+PQfformat.argtypes = [PGresult_ptr, c_int]
+PQfformat.restype = c_int
+
+PQftype = pq.PQftype
+PQftype.argtypes = [PGresult_ptr, c_int]
+PQftype.restype = Oid
+
+PQfmod = pq.PQfmod
+PQfmod.argtypes = [PGresult_ptr, c_int]
+PQfmod.restype = c_int
+
+PQfsize = pq.PQfsize
+PQfsize.argtypes = [PGresult_ptr, c_int]
+PQfsize.restype = c_int
+
+PQbinaryTuples = pq.PQbinaryTuples
+PQbinaryTuples.argtypes = [PGresult_ptr]
+PQbinaryTuples.restype = c_int
+
+PQgetvalue = pq.PQgetvalue
+PQgetvalue.argtypes = [PGresult_ptr, c_int, c_int]
+PQgetvalue.restype = POINTER(c_char) # not a null-terminated string
+
+PQgetisnull = pq.PQgetisnull
+PQgetisnull.argtypes = [PGresult_ptr, c_int, c_int]
+PQgetisnull.restype = c_int
+
+PQgetlength = pq.PQgetlength
+PQgetlength.argtypes = [PGresult_ptr, c_int, c_int]
+PQgetlength.restype = c_int
+
+PQnparams = pq.PQnparams
+PQnparams.argtypes = [PGresult_ptr]
+PQnparams.restype = c_int
+
+PQparamtype = pq.PQparamtype
+PQparamtype.argtypes = [PGresult_ptr, c_int]
+PQparamtype.restype = Oid
+
+# PQprint: pretty useless
+
+
# 33.11. Miscellaneous Functions
PQfreemem = pq.PQfreemem
PQPING_REJECT = auto()
PQPING_NO_RESPONSE = auto()
PQPING_NO_ATTEMPT = auto()
+
+
+class DiagnosticField(IntEnum):
+ # from postgres_ext.h
+ PG_DIAG_SEVERITY = ord("S")
+ PG_DIAG_SEVERITY_NONLOCALIZED = ord("V")
+ PG_DIAG_SQLSTATE = ord("C")
+ PG_DIAG_MESSAGE_PRIMARY = ord("M")
+ PG_DIAG_MESSAGE_DETAIL = ord("D")
+ PG_DIAG_MESSAGE_HINT = ord("H")
+ PG_DIAG_STATEMENT_POSITION = ord("P")
+ PG_DIAG_INTERNAL_POSITION = ord("p")
+ PG_DIAG_INTERNAL_QUERY = ord("q")
+ PG_DIAG_CONTEXT = ord("W")
+ PG_DIAG_SCHEMA_NAME = ord("s")
+ PG_DIAG_TABLE_NAME = ord("t")
+ PG_DIAG_COLUMN_NAME = ord("c")
+ PG_DIAG_DATATYPE_NAME = ord("d")
+ PG_DIAG_CONSTRAINT_NAME = ord("n")
+ PG_DIAG_SOURCE_FILE = ord("F")
+ PG_DIAG_SOURCE_LINE = ord("L")
+ PG_DIAG_SOURCE_FUNCTION = ord("R")
# Copyright (C) 2020 The Psycopg Team
from collections import namedtuple
-from ctypes import c_char_p, pointer
+from ctypes import string_at
+from ctypes import c_char_p, c_int, pointer
from .enums import (
ConnStatus,
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
+ def exec_params(
+ self,
+ command,
+ param_values,
+ param_types=None,
+ param_formats=None,
+ result_format=0,
+ ):
+ if not isinstance(command, bytes):
+ raise TypeError(f"bytes expected, got {command!r} instead")
+
+ nparams = len(param_values)
+ if nparams:
+ aparams = (c_char_p * nparams)(*param_values)
+ alenghts = (c_int * nparams)(
+ *(len(p) if p is not None else 0 for p in param_values)
+ )
+ else:
+ aparams = alenghts = None
+
+ if param_types is None:
+ atypes = None
+ else:
+ if len(param_types) != nparams:
+ raise ValueError(
+ "got %d param_values but %d param_types"
+ % (nparams, len(param_types))
+ )
+ atypes = (impl.Oid * nparams)(*param_types)
+
+ if param_formats is None:
+ aformats = None
+ else:
+ if len(param_formats) != nparams:
+ raise ValueError(
+ "got %d param_values but %d param_types"
+ % (nparams, len(param_formats))
+ )
+ aformats = (c_int * nparams)(*param_formats)
+
+ rv = impl.PQexecParams(
+ self.pgconn_ptr,
+ command,
+ nparams,
+ atypes,
+ aparams,
+ alenghts,
+ aformats,
+ result_format,
+ )
+ if rv is None:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult(rv)
+
class PGresult:
__slots__ = ("pgresult_ptr",)
rv = impl.PQresultStatus(self.pgresult_ptr)
return ExecStatus(rv)
+ @property
+ def error_message(self):
+ return impl.PQresultErrorMessage(self.pgresult_ptr)
+
+ def error_field(self, fieldcode):
+ return impl.PQresultErrorField(self.pgresult_ptr, fieldcode)
+
+ @property
+ def ntuples(self):
+ return impl.PQntuples(self.pgresult_ptr)
+
+ @property
+ def nfields(self):
+ return impl.PQnfields(self.pgresult_ptr)
+
+ def fname(self, column_number):
+ return impl.PQfname(self.pgresult_ptr, column_number)
+
+ def ftable(self, column_number):
+ return impl.PQftable(self.pgresult_ptr, column_number)
+
+ def ftablecol(self, column_number):
+ return impl.PQftablecol(self.pgresult_ptr, column_number)
+
+ def fformat(self, column_number):
+ return impl.PQfformat(self.pgresult_ptr, column_number)
+
+ def ftype(self, column_number):
+ return impl.PQftype(self.pgresult_ptr, column_number)
+
+ def fmod(self, column_number):
+ return impl.PQfmod(self.pgresult_ptr, column_number)
+
+ def fsize(self, column_number):
+ return impl.PQfsize(self.pgresult_ptr, column_number)
+
+ @property
+ def binary_tuples(self):
+ return impl.PQbinaryTuples(self.pgresult_ptr)
+
+ def get_value(self, row_number, column_number):
+ length = impl.PQgetlength(self.pgresult_ptr, row_number, column_number)
+ if length:
+ v = impl.PQgetvalue(self.pgresult_ptr, row_number, column_number)
+ return string_at(v, length)
+ else:
+ if impl.PQgetisnull(self.pgresult_ptr, row_number, column_number):
+ return None
+ else:
+ return b""
+
ConninfoOption = namedtuple(
"ConninfoOption", "keyword envvar compiled val label dispatcher dispsize"
def test_exec_error(pq, pgconn):
res = pgconn.exec_(b"wat")
assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+
+
+def test_exec_params(pq, pgconn):
+ res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.nfields == 1
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_exec_params_empty(pq, pgconn):
+ res = pgconn.exec_params(b"select 8::int", [])
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.nfields == 1
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"8"
+
+
+def test_exec_params_types(pq, pgconn):
+ res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.nfields == 2
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"8"
+ assert res.ftype(0) == 1700
+ assert res.get_value(0, 1) == b"8"
+ assert res.ftype(1) == 23
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700])
+
+
+def test_exec_params_nulls(pq, pgconn):
+ res = pgconn.exec_params(b"select $1, $2, $3", [b"hi", b"", None])
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.nfields == 3
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"hi"
+ assert res.get_value(0, 1) == b""
+ assert res.get_value(0, 2) is None
+
+
+def test_exec_params_binary_in(pq, pgconn):
+ val = b"foo\00bar"
+ res = pgconn.exec_params(
+ b"select length($1::bytea), length($2::bytea)",
+ [val, val],
+ param_formats=[0, 1],
+ )
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.get_value(0, 0) == b"3"
+ assert res.get_value(0, 1) == b"7"
+
+ with pytest.raises(ValueError):
+ pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
+
+
+@pytest.mark.parametrize(
+ "fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
+)
+def test_exec_params_binary_out(pq, pgconn, fmt, out):
+ val = b"foo\00bar"
+ res = pgconn.exec_params(
+ b"select $1::bytea", [val], param_formats=[1], result_format=fmt
+ )
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.get_value(0, 0) == out