PQfinish.argtypes = [PGconn_ptr]
PQfinish.restype = None
+PQreset = pq.PQreset
+PQreset.argtypes = [PGconn_ptr]
+PQreset.restype = None
+
+PQresetStart = pq.PQresetStart
+PQresetStart.argtypes = [PGconn_ptr]
+PQresetStart.restype = c_int
+
+PQresetPoll = pq.PQresetPoll
+PQresetPoll.argtypes = [PGconn_ptr]
+PQresetPoll.restype = c_int
+
# 33.2. Connection Status Functions
from . import _pq_ctypes as impl
+class PQerror(Exception):
+ pass
+
+
class PGconn:
__slots__ = ("pgconn_ptr",)
finally:
impl.PQconninfoFree(opts)
+ def reset(self):
+ impl.PQreset(self.pgconn_ptr)
+
+ def reset_start(self):
+ rv = impl.PQresetStart(self.pgconn_ptr)
+ if rv == 0:
+ raise PQerror("couldn't reset connection")
+
+ def reset_poll(self):
+ rv = impl.PQresetPoll(self.pgconn_ptr)
+ return PostgresPollingStatus(rv)
+
@property
def status(self):
rv = impl.PQstatus(self.pgconn_ptr)
assert dbname.label == "Database-Name"
assert dbname.dispatcher == ""
assert dbname.dispsize == 20
+
+
+def test_reset(pq, dsn):
+ conn = pq.PGconn.connect(dsn)
+ assert conn.status == ConnStatus.CONNECTION_OK
+ # TODO: break it
+ conn.reset()
+ assert conn.status == ConnStatus.CONNECTION_OK
+
+
+def test_reset_async(pq, dsn):
+ conn = pq.PGconn.connect(dsn)
+ assert conn.status == ConnStatus.CONNECTION_OK
+ # TODO: break it
+ conn.reset_start()
+ while 1:
+ rv = conn.connect_poll()
+ if rv == PostgresPollingStatus.PGRES_POLLING_READING:
+ select([conn.socket], [], [])
+ elif rv == PostgresPollingStatus.PGRES_POLLING_WRITING:
+ select([], [conn.socket], [])
+ else:
+ break
+
+ assert rv == PostgresPollingStatus.PGRES_POLLING_OK
+ assert conn.status == ConnStatus.CONNECTION_OK