# 33.2. Connection Status Functions
+PQdb = pq.PQdb
+PQdb.argtypes = [PGconn_ptr]
+PQdb.restype = c_char_p
+
+PQuser = pq.PQuser
+PQuser.argtypes = [PGconn_ptr]
+PQuser.restype = c_char_p
+
+PQpass = pq.PQpass
+PQpass.argtypes = [PGconn_ptr]
+PQpass.restype = c_char_p
+
+PQhost = pq.PQhost
+PQhost.argtypes = [PGconn_ptr]
+PQhost.restype = c_char_p
+
+PQhostaddr = pq.PQhostaddr
+PQhostaddr.argtypes = [PGconn_ptr]
+PQhostaddr.restype = c_char_p
+
+PQport = pq.PQport
+PQport.argtypes = [PGconn_ptr]
+PQport.restype = c_char_p
+
+PQtty = pq.PQtty
+PQtty.argtypes = [PGconn_ptr]
+PQtty.restype = c_char_p
+
+PQoptions = pq.PQoptions
+PQoptions.argtypes = [PGconn_ptr]
+PQoptions.restype = c_char_p
+
PQstatus = pq.PQstatus
PQstatus.argtypes = [PGconn_ptr]
PQstatus.restype = c_int
# Copyright (C) 2020 The Psycopg Team
-from .pq_enums import ConnStatus
+from .pq_enums import ConnStatus, PostgresPollingStatus, PGPing
from . import pq_ctypes as pq_module
PGconn = pq_module.PGconn
PQerror = pq_module.PQerror
-__all__ = ("ConnStatus", "PGconn", "PQerror")
+__all__ = (
+ "ConnStatus",
+ "PostgresPollingStatus",
+ "PGPing",
+ "PGconn",
+ "PQerror",
+)
def connect(cls, conninfo):
if isinstance(conninfo, str):
conninfo = conninfo.encode("utf8")
-
if not isinstance(conninfo, bytes):
raise TypeError("bytes expected, got %r instead" % conninfo)
rv = impl.PQresetPoll(self.pgconn_ptr)
return PostgresPollingStatus(rv)
- @property
- def status(self):
- rv = impl.PQstatus(self.pgconn_ptr)
- return ConnStatus(rv)
-
@classmethod
def ping(self, conninfo):
if isinstance(conninfo, str):
rv = impl.PQping(conninfo)
return PGPing(rv)
+ @property
+ def db(self):
+ return self._decode(impl.PQdb(self.pgconn_ptr))
+
+ @property
+ def user(self):
+ return self._decode(impl.PQuser(self.pgconn_ptr))
+
+ @property
+ def password(self):
+ return self._decode(impl.PQpass(self.pgconn_ptr))
+
+ @property
+ def host(self):
+ return self._decode(impl.PQhost(self.pgconn_ptr))
+
+ @property
+ def hostaddr(self):
+ return self._decode(impl.PQhostaddr(self.pgconn_ptr))
+
+ @property
+ def port(self):
+ return self._decode(impl.PQport(self.pgconn_ptr))
+
+ @property
+ def tty(self):
+ return self._decode(impl.PQtty(self.pgconn_ptr))
+
+ @property
+ def options(self):
+ return self._decode(impl.PQoptions(self.pgconn_ptr))
+
+ @property
+ def status(self):
+ rv = impl.PQstatus(self.pgconn_ptr)
+ return ConnStatus(rv)
+
@property
def error_message(self):
- # TODO: decode
- return impl.PQerrorMessage(self.pgconn_ptr)
+ return self._decode(impl.PQerrorMessage(self.pgconn_ptr))
@property
def socket(self):
return impl.PQsocket(self.pgconn_ptr)
+ def _encode(self, s):
+ # TODO: encode in client encoding
+ return s.encode("utf8")
+
+ def _decode(self, b):
+ if b is None:
+ return None
+ # TODO: decode in client encoding
+ return b.decode("utf8", "replace")
+
ConninfoOption = namedtuple(
"ConninfoOption", "keyword envvar compiled val label dispatcher dispsize"
return pq
-@pytest.fixture()
+@pytest.fixture
def dsn(request):
"""Return the dsn used to connect to the `--test-dsn` database."""
dsn = request.config.getoption("--test-dsn")
if not dsn:
pytest.skip("skipping test as no --test-dsn")
return dsn
+
+
+@pytest.fixture
+def pgconn(pq, dsn):
+ return pq.PGconn.connect(dsn)
assert port.dispsize == 6
-def test_info(pq, dsn):
- conn = pq.PGconn.connect(dsn)
- info = conn.info
+def test_info(dsn, pgconn):
+ info = pgconn.info
assert len(info) > 20
dbname = [d for d in info if d.keyword == "dbname"][0]
assert dbname.envvar == "PGDATABASE"
- assert dbname.val == "psycopg3_test" # TODO: parse from dsn
assert dbname.label == "Database-Name"
assert dbname.dispatcher == ""
assert dbname.dispsize == 20
+ parsed = pgconn.parse_conninfo(dsn)
+ name = [o.val for o in parsed if o.keyword == "dbname"][0]
+ assert dbname.val == name
+
def test_conninfo_parse(pq):
info = pq.PGconn.parse_conninfo(
assert "bad_conninfo" in str(e.value)
-def test_reset(pq, dsn):
- conn = pq.PGconn.connect(dsn)
- assert conn.status == ConnStatus.CONNECTION_OK
+def test_reset(pgconn):
+ assert pgconn.status == ConnStatus.CONNECTION_OK
# TODO: break it
- conn.reset()
- assert conn.status == ConnStatus.CONNECTION_OK
+ pgconn.reset()
+ assert pgconn.status == ConnStatus.CONNECTION_OK
-def test_reset_async(pq, dsn):
- conn = pq.PGconn.connect(dsn)
- assert conn.status == ConnStatus.CONNECTION_OK
+def test_reset_async(pgconn):
+ assert pgconn.status == ConnStatus.CONNECTION_OK
# TODO: break it
- conn.reset_start()
+ pgconn.reset_start()
while 1:
- rv = conn.connect_poll()
+ rv = pgconn.connect_poll()
if rv == PostgresPollingStatus.PGRES_POLLING_READING:
- select([conn.socket], [], [])
+ select([pgconn.socket], [], [])
elif rv == PostgresPollingStatus.PGRES_POLLING_WRITING:
- select([], [conn.socket], [])
+ select([], [pgconn.socket], [])
else:
break
assert rv == PostgresPollingStatus.PGRES_POLLING_OK
- assert conn.status == ConnStatus.CONNECTION_OK
+ assert pgconn.status == ConnStatus.CONNECTION_OK
def test_ping(pq, dsn):
rv = pq.PGconn.ping("port=99999")
assert rv == PGPing.PQPING_NO_RESPONSE
+
+
+def test_db(pgconn):
+ name = [o.val for o in pgconn.info if o.keyword == "dbname"][0]
+ assert pgconn.db == name
+
+
+def test_user(pgconn):
+ user = [o.val for o in pgconn.info if o.keyword == "user"][0]
+ assert pgconn.user == user
+
+
+def test_password(pgconn):
+ # not in info
+ assert isinstance(pgconn.password, str)
+
+
+def test_host(pgconn):
+ # might be not in info
+ assert isinstance(pgconn.host, str)
+
+
+def test_hostaddr(pgconn):
+ # not in info
+ assert isinstance(pgconn.hostaddr, str)
+
+
+def test_tty(pgconn):
+ tty = [o.val for o in pgconn.info if o.keyword == "tty"][0]
+ assert pgconn.tty == tty