import pytest
-from psycopg3.pq_enums import ConnStatus, PollingStatus, Ping
-
def test_connectdb(pq, dsn):
conn = pq.PGconn.connect(dsn)
- assert conn.status == ConnStatus.CONNECTION_OK, conn.error_message
+ assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
def test_connectdb_bytes(pq, dsn):
conn = pq.PGconn.connect(dsn.encode("utf8"))
- assert conn.status == ConnStatus.CONNECTION_OK, conn.error_message
+ assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
def test_connectdb_error(pq):
conn = pq.PGconn.connect("dbname=psycopg3_test_not_for_real")
- assert conn.status == ConnStatus.CONNECTION_BAD
+ assert conn.status == pq.ConnStatus.CONNECTION_BAD
@pytest.mark.parametrize("baddsn", [None, 42])
def test_connect_async(pq, dsn):
conn = pq.PGconn.connect_start(dsn)
while 1:
- assert conn.status != ConnStatus.CONNECTION_BAD
+ assert conn.status != pq.ConnStatus.CONNECTION_BAD
rv = conn.connect_poll()
- if rv == PollingStatus.PGRES_POLLING_OK:
+ if rv == pq.PollingStatus.PGRES_POLLING_OK:
break
- elif rv == PollingStatus.PGRES_POLLING_READING:
+ elif rv == pq.PollingStatus.PGRES_POLLING_READING:
select([conn.socket], [], [])
- elif rv == PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
select([], [conn.socket], [])
else:
assert False, rv
- assert conn.status == ConnStatus.CONNECTION_OK
+ assert conn.status == pq.ConnStatus.CONNECTION_OK
def test_connect_async_bad(pq, dsn):
conn = pq.PGconn.connect_start("dbname=psycopg3_test_not_for_real")
while 1:
- assert conn.status != ConnStatus.CONNECTION_BAD
+ assert conn.status != pq.ConnStatus.CONNECTION_BAD
rv = conn.connect_poll()
- if rv == PollingStatus.PGRES_POLLING_FAILED:
+ if rv == pq.PollingStatus.PGRES_POLLING_FAILED:
break
- elif rv == PollingStatus.PGRES_POLLING_READING:
+ elif rv == pq.PollingStatus.PGRES_POLLING_READING:
select([conn.socket], [], [])
- elif rv == PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
select([], [conn.socket], [])
else:
assert False, rv
- assert conn.status == ConnStatus.CONNECTION_BAD
+ assert conn.status == pq.ConnStatus.CONNECTION_BAD
def test_info(pq, dsn, pgconn):
assert dbname.val == name
-def test_reset(pgconn):
- assert pgconn.status == ConnStatus.CONNECTION_OK
+def test_reset(pq, pgconn):
+ assert pgconn.status == pq.ConnStatus.CONNECTION_OK
# TODO: break it
pgconn.reset()
- assert pgconn.status == ConnStatus.CONNECTION_OK
+ assert pgconn.status == pq.ConnStatus.CONNECTION_OK
-def test_reset_async(pgconn):
- assert pgconn.status == ConnStatus.CONNECTION_OK
+def test_reset_async(pq, pgconn):
+ assert pgconn.status == pq.ConnStatus.CONNECTION_OK
# TODO: break it
pgconn.reset_start()
while 1:
rv = pgconn.connect_poll()
- if rv == PollingStatus.PGRES_POLLING_READING:
+ if rv == pq.PollingStatus.PGRES_POLLING_READING:
select([pgconn.socket], [], [])
- elif rv == PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
select([], [pgconn.socket], [])
else:
break
- assert rv == PollingStatus.PGRES_POLLING_OK
- assert pgconn.status == ConnStatus.CONNECTION_OK
+ assert rv == pq.PollingStatus.PGRES_POLLING_OK
+ assert pgconn.status == pq.ConnStatus.CONNECTION_OK
def test_ping(pq, dsn):
rv = pq.PGconn.ping(dsn)
- assert rv == Ping.PQPING_OK
+ assert rv == pq.Ping.PQPING_OK
rv = pq.PGconn.ping("port=99999")
- assert rv == Ping.PQPING_NO_RESPONSE
+ assert rv == pq.Ping.PQPING_NO_RESPONSE
def test_db(pgconn):