from os import getpid
from weakref import ref
-from ctypes import Array, POINTER, cast, pointer, string_at, create_string_buffer, byref
+from ctypes import Array, POINTER, cast, string_at, create_string_buffer, byref
from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong, c_void_p, py_object
from typing import Any, Callable, List, Optional, Sequence, Tuple
from typing import cast as t_cast, TYPE_CHECKING
See :pq:`PQcancel()` for details.
"""
buf = create_string_buffer(256)
- res = impl.PQcancel(self.pgcancel_ptr, pointer(buf), len(buf)) # type: ignore
+ res = impl.PQcancel(
+ self.pgcancel_ptr,
+ byref(buf), # type: ignore[arg-type]
+ len(buf),
+ )
if not res:
raise e.OperationalError(
f"cancel failed: {buf.value.decode('utf8', 'ignore')}"
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
errmsg = c_char_p()
- rv = impl.PQconninfoParse(conninfo, pointer(errmsg))
+ rv = impl.PQconninfoParse(conninfo, byref(errmsg)) # type: ignore[arg-type]
if not rv:
if not errmsg:
raise MemoryError("couldn't allocate on conninfo parse")
out = create_string_buffer(len(data) * 2 + 1)
impl.PQescapeStringConn(
self.conn._pgconn_ptr,
- pointer(out), # type: ignore
+ byref(out), # type: ignore[arg-type]
data,
len(data),
- pointer(error),
+ byref(error), # type: ignore[arg-type]
)
if error:
else:
out = create_string_buffer(len(data) * 2 + 1)
impl.PQescapeString(
- pointer(out), # type: ignore
+ byref(out), # type: ignore[arg-type]
data,
len(data),
)
self.conn._pgconn_ptr,
data,
len(data),
- pointer(t_cast(c_ulong, len_out)),
+ byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
)
else:
- out = impl.PQescapeBytea(data, len(data), pointer(t_cast(c_ulong, len_out)))
+ out = impl.PQescapeBytea(
+ data,
+ len(data),
+ byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
+ )
if not out:
raise MemoryError(
f"couldn't allocate for escape_bytea of {len(data)} bytes"
self.conn._ensure_pgconn()
len_out = c_size_t()
- out = impl.PQunescapeBytea(data, pointer(t_cast(c_ulong, len_out)))
+ out = impl.PQunescapeBytea(
+ data,
+ byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
+ )
if not out:
raise MemoryError(
f"couldn't allocate for unescape_bytea of {len(data)} bytes"