impl.PQfinish(self.pgconn_ptr)
self.pgconn_ptr = NULL
+ @property
+ def pgconn_ptr(self) -> Optional[int]:
+ if self.pgconn_ptr:
+ return <long><void *>self.pgconn_ptr
+ else:
+ return None
+
@property
def info(self) -> List["ConninfoOption"]:
self._ensure_pgconn()
impl.PQclear(self.pgresult_ptr)
self.pgresult_ptr = NULL
+ @property
+ def pgresult_ptr(self) -> Optional[int]:
+ if self.pgresult_ptr:
+ return <long><void *>self.pgresult_ptr
+ else:
+ return None
+
@property
def status(self) -> ExecStatus:
cdef int rv = impl.PQresultStatus(self.pgresult_ptr)
return pq
+@pytest.fixture
+def libpq():
+ """Return a ctypes wrapper to access the libpq."""
+ import ctypes.util
+
+ libname = ctypes.util.find_library("pq")
+ return ctypes.pydll.LoadLibrary(libname)
+
+
def check_libpq_version(got, want):
"""
Verify if the libpq version is a version accepted.
import gc
import os
+import ctypes
import logging
import weakref
from select import select
assert w() is None
+def test_pgconn_ptr(pgconn, libpq):
+ pgconn.pgconn_ptr is not None
+
+ f = libpq.PQserverVersion
+ f.argtypes = [ctypes.c_void_p]
+ f.restype = ctypes.c_int
+ ver = f(pgconn.pgconn_ptr)
+ assert ver == pgconn.server_version
+
+ pgconn.finish()
+ assert pgconn.pgconn_ptr is None
+
+
def test_info(pq, dsn, pgconn):
info = pgconn.info
assert len(info) > 20
+import ctypes
import pytest
assert res.status == pq.ExecStatus.FATAL_ERROR
+def test_pgresult_ptr(pgconn, libpq):
+ res = pgconn.exec_(b"select 1")
+ assert res.pgresult_ptr is not None
+
+ f = libpq.PQcmdStatus
+ f.argtypes = [ctypes.c_void_p]
+ f.restype = ctypes.c_char_p
+ assert f(res.pgresult_ptr) == b"SELECT 1"
+
+ res.clear()
+ assert res.pgresult_ptr is None
+
+
def test_error_message(pgconn):
res = pgconn.exec_(b"select 1")
assert res.error_message == b""