def error_message(self):
return impl.PQerrorMessage(self.pgconn_ptr)
+ @property
+ def error_str(self):
+ rv = self.error_message
+ if rv:
+ return rv.encode('utf8', 'replace').rstrip()
+ else:
+ return "no details available"
+
@property
def socket(self):
return impl.PQsocket(self.pgconn_ptr)
return PGresult(rv) if rv else None
def consume_input(self):
- return impl.PQconsumeInput(self.pgconn_ptr)
+ if 1 != impl.PQconsumeInput(self.pgconn_ptr):
+ raise PQerror(f"consuming input failed: {self.error_str}")
def is_busy(self):
return impl.PQisBusy(self.pgconn_ptr)
- def is_non_blocking(self):
+ @property
+ def nonblocking(self):
return impl.PQisnonblocking(self.pgconn_ptr)
- def set_non_blocking(self, arg):
- return impl.PQsetnonblocking(self.pgconn_ptr, arg)
+ @nonblocking.setter
+ def nonblocking(self, arg):
+ if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
+ raise PQerror(f"setting nonblocking failed: {self.error_str}")
def flush(self):
- return impl.PQflush(self.pgconn_ptr)
+ rv = impl.PQflush(self.pgconn_ptr)
+ if rv < 0:
+ raise PQerror(f"flushing failed: {self.error_str}")
+ return rv
class PGresult:
def test_send_query(pq, pgconn):
# This test shows how to process an async query in all its glory
- pgconn.set_non_blocking(1)
+ pgconn.nonblocking = 1
# Long query to make sure we have to wait on send
pgconn.send_query(
waited_on_send = 0
while 1:
f = pgconn.flush()
- assert f != -1
if f == 0:
break
if wl:
continue # call flush again()
if rl:
- assert pgconn.consume_input() == 1, pgconn.error_message
+ pgconn.consume_input()
continue
assert waited_on_send
# read loop
results = []
while 1:
- assert pgconn.consume_input() == 1, pgconn.error_message
+ pgconn.consume_input()
if pgconn.is_busy():
select([pgconn.socket], [], [])
continue