PQresetPoll.argtypes = [PGconn_ptr]
PQresetPoll.restype = c_int
+PQping = pq.PQping
+PQping.argtypes = [c_char_p]
+PQping.restype = c_int
+
# 33.2. Connection Status Functions
from collections import namedtuple
-from .pq_enums import ConnStatus, PostgresPollingStatus
+from .pq_enums import ConnStatus, PostgresPollingStatus, PGPing
from . import _pq_ctypes as impl
rv = impl.PQstatus(self.pgconn_ptr)
return ConnStatus(rv)
+ @classmethod
+ def ping(self, conninfo):
+ if isinstance(conninfo, str):
+ conninfo = conninfo.encode("utf8")
+
+ rv = impl.PQping(conninfo)
+ return PGPing(rv)
+
@property
def error_message(self):
# TODO: decode
PGRES_POLLING_WRITING = auto()
PGRES_POLLING_OK = auto()
PGRES_POLLING_ACTIVE = auto()
+
+
+class PGPing(IntEnum):
+ PQPING_OK = 0
+ PQPING_REJECT = auto()
+ PQPING_NO_RESPONSE = auto()
+ PQPING_NO_ATTEMPT = auto()
import pytest
-from psycopg3.pq_enums import ConnStatus, PostgresPollingStatus
+from psycopg3.pq_enums import ConnStatus, PostgresPollingStatus, PGPing
def test_connectdb(pq, dsn):
assert rv == PostgresPollingStatus.PGRES_POLLING_OK
assert conn.status == ConnStatus.CONNECTION_OK
+
+
+def test_ping(pq, dsn):
+ rv = pq.PGconn.ping(dsn)
+ assert rv == PGPing.PQPING_OK
+
+ rv = pq.PGconn.ping("port=99999")
+ assert rv == PGPing.PQPING_NO_RESPONSE