cdef class PGconn:
@staticmethod
- cdef PGconn _from_ptr(impl.PGconn *ptr):
+ cdef PGconn _from_ptr(libpq.PGconn *ptr):
cdef PGconn rv = PGconn.__new__(PGconn)
rv.pgconn_ptr = ptr
- impl.PQsetNoticeReceiver(ptr, notice_receiver, <void *>rv)
+ libpq.PQsetNoticeReceiver(ptr, notice_receiver, <void *>rv)
return rv
def __cinit__(self):
@classmethod
def connect(cls, const char *conninfo) -> PGconn:
- cdef impl.PGconn* pgconn = impl.PQconnectdb(conninfo)
+ cdef libpq.PGconn* pgconn = libpq.PQconnectdb(conninfo)
if not pgconn:
raise MemoryError("couldn't allocate PGconn")
@classmethod
def connect_start(cls, const char *conninfo) -> PGconn:
- cdef impl.PGconn* pgconn = impl.PQconnectStart(conninfo)
+ cdef libpq.PGconn* pgconn = libpq.PQconnectStart(conninfo)
if not pgconn:
raise MemoryError("couldn't allocate PGconn")
return PGconn._from_ptr(pgconn)
def connect_poll(self) -> PollingStatus:
- cdef int rv = self._call_int(<conn_int_f>impl.PQconnectPoll)
+ cdef int rv = self._call_int(<conn_int_f>libpq.PQconnectPoll)
return PollingStatus(rv)
def finish(self) -> None:
if self.pgconn_ptr is not NULL:
- impl.PQfinish(self.pgconn_ptr)
+ libpq.PQfinish(self.pgconn_ptr)
self.pgconn_ptr = NULL
@property
@property
def info(self) -> List["ConninfoOption"]:
self._ensure_pgconn()
- cdef impl.PQconninfoOption *opts = impl.PQconninfo(self.pgconn_ptr)
+ cdef libpq.PQconninfoOption *opts = libpq.PQconninfo(self.pgconn_ptr)
if opts is NULL:
raise MemoryError("couldn't allocate connection info")
rv = _options_from_array(opts)
- impl.PQconninfoFree(opts)
+ libpq.PQconninfoFree(opts)
return rv
def reset(self) -> None:
self._ensure_pgconn()
- impl.PQreset(self.pgconn_ptr)
+ libpq.PQreset(self.pgconn_ptr)
def reset_start(self) -> None:
- if not impl.PQresetStart(self.pgconn_ptr):
+ if not libpq.PQresetStart(self.pgconn_ptr):
raise PQerror("couldn't reset connection")
def reset_poll(self) -> PollingStatus:
- cdef int rv = self._call_int(<conn_int_f>impl.PQresetPoll)
+ cdef int rv = self._call_int(<conn_int_f>libpq.PQresetPoll)
return PollingStatus(rv)
@classmethod
def ping(self, const char *conninfo) -> Ping:
- cdef int rv = impl.PQping(conninfo)
+ cdef int rv = libpq.PQping(conninfo)
return Ping(rv)
@property
def db(self) -> bytes:
- return self._call_bytes(impl.PQdb)
+ return self._call_bytes(libpq.PQdb)
@property
def user(self) -> bytes:
- return self._call_bytes(impl.PQuser)
+ return self._call_bytes(libpq.PQuser)
@property
def password(self) -> bytes:
- return self._call_bytes(impl.PQpass)
+ return self._call_bytes(libpq.PQpass)
@property
def host(self) -> bytes:
- return self._call_bytes(impl.PQhost)
+ return self._call_bytes(libpq.PQhost)
@property
def hostaddr(self) -> bytes:
@property
def port(self) -> bytes:
- return self._call_bytes(impl.PQport)
+ return self._call_bytes(libpq.PQport)
@property
def tty(self) -> bytes:
- return self._call_bytes(impl.PQtty)
+ return self._call_bytes(libpq.PQtty)
@property
def options(self) -> bytes:
- return self._call_bytes(impl.PQoptions)
+ return self._call_bytes(libpq.PQoptions)
@property
def status(self) -> ConnStatus:
- cdef int rv = impl.PQstatus(self.pgconn_ptr)
+ cdef int rv = libpq.PQstatus(self.pgconn_ptr)
return ConnStatus(rv)
@property
def transaction_status(self) -> TransactionStatus:
- cdef int rv = impl.PQtransactionStatus(self.pgconn_ptr)
+ cdef int rv = libpq.PQtransactionStatus(self.pgconn_ptr)
return TransactionStatus(rv)
def parameter_status(self, name: bytes) -> Optional[bytes]:
self._ensure_pgconn()
- cdef const char *rv = impl.PQparameterStatus(self.pgconn_ptr, name)
+ cdef const char *rv = libpq.PQparameterStatus(self.pgconn_ptr, name)
if rv is not NULL:
return rv
else:
@property
def error_message(self) -> bytes:
- return impl.PQerrorMessage(self.pgconn_ptr)
+ return libpq.PQerrorMessage(self.pgconn_ptr)
@property
def protocol_version(self) -> int:
- return self._call_int(impl.PQprotocolVersion)
+ return self._call_int(libpq.PQprotocolVersion)
@property
def server_version(self) -> int:
- return self._call_int(impl.PQserverVersion)
+ return self._call_int(libpq.PQserverVersion)
@property
def socket(self) -> int:
- return self._call_int(impl.PQsocket)
+ return self._call_int(libpq.PQsocket)
@property
def backend_pid(self) -> int:
- return self._call_int(impl.PQbackendPID)
+ return self._call_int(libpq.PQbackendPID)
@property
def needs_password(self) -> bool:
- return bool(self._call_int(impl.PQconnectionNeedsPassword))
+ return bool(self._call_int(libpq.PQconnectionNeedsPassword))
@property
def used_password(self) -> bool:
- return bool(self._call_int(impl.PQconnectionUsedPassword))
+ return bool(self._call_int(libpq.PQconnectionUsedPassword))
@property
def ssl_in_use(self) -> bool:
- return bool(self._call_int(<conn_int_f>impl.PQsslInUse))
+ return bool(self._call_int(<conn_int_f>libpq.PQsslInUse))
def exec_(self, command: bytes) -> PGresult:
self._ensure_pgconn()
- cdef impl.PGresult *pgresult = impl.PQexec(self.pgconn_ptr, command)
+ cdef libpq.PGresult *pgresult = libpq.PQexec(self.pgconn_ptr, command)
if pgresult is NULL:
raise MemoryError("couldn't allocate PGresult")
def send_query(self, command: bytes) -> None:
self._ensure_pgconn()
- if not impl.PQsendQuery(self.pgconn_ptr, command):
+ if not libpq.PQsendQuery(self.pgconn_ptr, command):
raise PQerror(f"sending query failed: {error_message(self)}")
def exec_params(
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, param_types, param_formats)
- cdef impl.PGresult *pgresult = impl.PQexecParams(
+ cdef libpq.PGresult *pgresult = libpq.PQexecParams(
self.pgconn_ptr, command, cnparams, ctypes,
<const char *const *>cvalues, clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, param_types, param_formats)
- cdef int rv = impl.PQsendQueryParams(
+ cdef int rv = libpq.PQsendQueryParams(
self.pgconn_ptr, command, cnparams, ctypes,
<const char *const *>cvalues,
clengths, cformats, result_format)
for i in range(nparams):
atypes[i] = param_types[i]
- cdef int rv = impl.PQsendPrepare(
+ cdef int rv = libpq.PQsendPrepare(
self.pgconn_ptr, name, command, nparams, atypes
)
PyMem_Free(atypes)
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, None, param_formats)
- cdef int rv = impl.PQsendQueryPrepared(
+ cdef int rv = libpq.PQsendQueryPrepared(
self.pgconn_ptr, name, cnparams,
<const char *const *>cvalues,
clengths, cformats, result_format)
for i in range(nparams):
atypes[i] = param_types[i]
- cdef impl.PGresult *rv = impl.PQprepare(
+ cdef libpq.PGresult *rv = libpq.PQprepare(
self.pgconn_ptr, name, command, nparams, atypes)
PyMem_Free(atypes)
if rv is NULL:
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, None, param_formats)
- cdef impl.PGresult *rv = impl.PQexecPrepared(
+ cdef libpq.PGresult *rv = libpq.PQexecPrepared(
self.pgconn_ptr, name, cnparams,
<const char *const *>cvalues,
clengths, cformats, result_format)
def describe_prepared(self, name: bytes) -> PGresult:
self._ensure_pgconn()
- cdef impl.PGresult *rv = impl.PQdescribePrepared(self.pgconn_ptr, name)
+ cdef libpq.PGresult *rv = libpq.PQdescribePrepared(self.pgconn_ptr, name)
if rv is NULL:
raise MemoryError("couldn't allocate PGresult")
return PGresult._from_ptr(rv)
def describe_portal(self, name: bytes) -> PGresult:
self._ensure_pgconn()
- cdef impl.PGresult *rv = impl.PQdescribePortal(self.pgconn_ptr, name)
+ cdef libpq.PGresult *rv = libpq.PQdescribePortal(self.pgconn_ptr, name)
if rv is NULL:
raise MemoryError("couldn't allocate PGresult")
return PGresult._from_ptr(rv)
def get_result(self) -> Optional["PGresult"]:
- cdef impl.PGresult *pgresult = impl.PQgetResult(self.pgconn_ptr)
+ cdef libpq.PGresult *pgresult = libpq.PQgetResult(self.pgconn_ptr)
if pgresult is NULL:
return None
return PGresult._from_ptr(pgresult)
def consume_input(self) -> None:
- if 1 != impl.PQconsumeInput(self.pgconn_ptr):
+ if 1 != libpq.PQconsumeInput(self.pgconn_ptr):
raise PQerror(f"consuming input failed: {error_message(self)}")
def is_busy(self) -> int:
cdef int rv
with nogil:
- rv = impl.PQisBusy(self.pgconn_ptr)
+ rv = libpq.PQisBusy(self.pgconn_ptr)
return rv
@property
def nonblocking(self) -> int:
- return impl.PQisnonblocking(self.pgconn_ptr)
+ return libpq.PQisnonblocking(self.pgconn_ptr)
@nonblocking.setter
def nonblocking(self, arg: int) -> None:
- if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
+ if 0 > libpq.PQsetnonblocking(self.pgconn_ptr, arg):
raise PQerror(f"setting nonblocking failed: {error_message(self)}")
def flush(self) -> int:
- cdef int rv = impl.PQflush(self.pgconn_ptr)
+ cdef int rv = libpq.PQflush(self.pgconn_ptr)
if rv < 0:
raise PQerror(f"flushing failed:{error_message(self)}")
return rv
def get_cancel(self) -> PGcancel:
- cdef impl.PGcancel *ptr = impl.PQgetCancel(self.pgconn_ptr)
+ cdef libpq.PGcancel *ptr = libpq.PQgetCancel(self.pgconn_ptr)
if not ptr:
raise PQerror("couldn't create cancel object")
return PGcancel._from_ptr(ptr)
def notifies(self) -> Optional[PGnotify]:
- cdef impl.PGnotify *ptr
+ cdef libpq.PGnotify *ptr
with nogil:
- ptr = impl.PQnotifies(self.pgconn_ptr)
+ ptr = libpq.PQnotifies(self.pgconn_ptr)
if ptr:
ret = PGnotify(ptr.relname, ptr.be_pid, ptr.extra)
- impl.PQfreemem(ptr)
+ libpq.PQfreemem(ptr)
return ret
else:
return None
cdef int rv
cdef const char *cbuffer = PyBytes_AsString(buffer)
cdef int length = len(buffer)
- rv = impl.PQputCopyData(self.pgconn_ptr, cbuffer, length)
+ rv = libpq.PQputCopyData(self.pgconn_ptr, cbuffer, length)
if rv < 0:
raise PQerror(f"sending copy data failed: {error_message(self)}")
return rv
cdef const char *cerr = NULL
if error is not None:
cerr = PyBytes_AsString(error)
- rv = impl.PQputCopyEnd(self.pgconn_ptr, cerr)
+ rv = libpq.PQputCopyEnd(self.pgconn_ptr, cerr)
if rv < 0:
raise PQerror(f"sending copy end failed: {error_message(self)}")
return rv
def get_copy_data(self, async_: int) -> Tuple[int, bytes]:
cdef char *buffer_ptr = NULL
cdef int nbytes
- nbytes = impl.PQgetCopyData(self.pgconn_ptr, &buffer_ptr, async_)
+ nbytes = libpq.PQgetCopyData(self.pgconn_ptr, &buffer_ptr, async_)
if nbytes == -2:
raise PQerror(f"receiving copy data failed: {error_message(self)}")
if buffer_ptr is not NULL:
# TODO: do it without copy
data = buffer_ptr[:nbytes]
- impl.PQfreemem(buffer_ptr)
+ libpq.PQfreemem(buffer_ptr)
return nbytes, data
else:
return nbytes, b""
def make_empty_result(self, exec_status: ExecStatus) -> PGresult:
- cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult(
+ cdef libpq.PGresult *rv = libpq.PQmakeEmptyPGresult(
self.pgconn_ptr, exec_status)
if not rv:
raise MemoryError("couldn't allocate empty PGresult")
return func(self.pgconn_ptr)
-cdef void notice_receiver(void *arg, const impl.PGresult *res_ptr) with gil:
+cdef void notice_receiver(void *arg, const libpq.PGresult *res_ptr) with gil:
cdef PGconn pgconn = <object>arg
if pgconn.notice_handler is None:
return
- cdef PGresult res = PGresult._from_ptr(<impl.PGresult *>res_ptr)
+ cdef PGresult res = PGresult._from_ptr(<libpq.PGresult *>res_ptr)
try:
pgconn.notice_handler(res)
except Exception as e:
self.pgresult_ptr = NULL
@staticmethod
- cdef PGresult _from_ptr(impl.PGresult *ptr):
+ cdef PGresult _from_ptr(libpq.PGresult *ptr):
cdef PGresult rv = PGresult.__new__(PGresult)
rv.pgresult_ptr = ptr
return rv
def clear(self) -> None:
if self.pgresult_ptr is not NULL:
- impl.PQclear(self.pgresult_ptr)
+ libpq.PQclear(self.pgresult_ptr)
self.pgresult_ptr = NULL
@property
@property
def status(self) -> ExecStatus:
- cdef int rv = impl.PQresultStatus(self.pgresult_ptr)
+ cdef int rv = libpq.PQresultStatus(self.pgresult_ptr)
return ExecStatus(rv)
@property
def error_message(self) -> bytes:
- return impl.PQresultErrorMessage(self.pgresult_ptr)
+ return libpq.PQresultErrorMessage(self.pgresult_ptr)
def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]:
- cdef char * rv = impl.PQresultErrorField(self.pgresult_ptr, fieldcode)
+ cdef char * rv = libpq.PQresultErrorField(self.pgresult_ptr, fieldcode)
if rv is not NULL:
return rv
else:
@property
def ntuples(self) -> int:
- return impl.PQntuples(self.pgresult_ptr)
+ return libpq.PQntuples(self.pgresult_ptr)
@property
def nfields(self) -> int:
- return impl.PQnfields(self.pgresult_ptr)
+ return libpq.PQnfields(self.pgresult_ptr)
def fname(self, column_number: int) -> Optional[bytes]:
- cdef char *rv = impl.PQfname(self.pgresult_ptr, column_number)
+ cdef char *rv = libpq.PQfname(self.pgresult_ptr, column_number)
if rv is not NULL:
return rv
else:
return None
def ftable(self, column_number: int) -> int:
- return impl.PQftable(self.pgresult_ptr, column_number)
+ return libpq.PQftable(self.pgresult_ptr, column_number)
def ftablecol(self, column_number: int) -> int:
- return impl.PQftablecol(self.pgresult_ptr, column_number)
+ return libpq.PQftablecol(self.pgresult_ptr, column_number)
def fformat(self, column_number: int) -> Format:
- return Format(impl.PQfformat(self.pgresult_ptr, column_number))
+ return Format(libpq.PQfformat(self.pgresult_ptr, column_number))
def ftype(self, column_number: int) -> int:
- return impl.PQftype(self.pgresult_ptr, column_number)
+ return libpq.PQftype(self.pgresult_ptr, column_number)
def fmod(self, column_number: int) -> int:
- return impl.PQfmod(self.pgresult_ptr, column_number)
+ return libpq.PQfmod(self.pgresult_ptr, column_number)
def fsize(self, column_number: int) -> int:
- return impl.PQfsize(self.pgresult_ptr, column_number)
+ return libpq.PQfsize(self.pgresult_ptr, column_number)
@property
def binary_tuples(self) -> Format:
- return Format(impl.PQbinaryTuples(self.pgresult_ptr))
+ return Format(libpq.PQbinaryTuples(self.pgresult_ptr))
def get_value(
self, row_number: int, column_number: int
) -> Optional[bytes]:
cdef int crow = row_number
cdef int ccol = column_number
- cdef int length = impl.PQgetlength(self.pgresult_ptr, crow, ccol)
+ cdef int length = libpq.PQgetlength(self.pgresult_ptr, crow, ccol)
cdef char *v;
if length:
- v = impl.PQgetvalue(self.pgresult_ptr, crow, ccol)
+ v = libpq.PQgetvalue(self.pgresult_ptr, crow, ccol)
# TODO: avoid copy
return v[:length]
else:
- if impl.PQgetisnull(self.pgresult_ptr, crow, ccol):
+ if libpq.PQgetisnull(self.pgresult_ptr, crow, ccol):
return None
else:
return b""
@property
def nparams(self) -> int:
- return impl.PQnparams(self.pgresult_ptr)
+ return libpq.PQnparams(self.pgresult_ptr)
def param_type(self, param_number: int) -> int:
- return impl.PQparamtype(self.pgresult_ptr, param_number)
+ return libpq.PQparamtype(self.pgresult_ptr, param_number)
@property
def command_status(self) -> Optional[bytes]:
- cdef char *rv = impl.PQcmdStatus(self.pgresult_ptr)
+ cdef char *rv = libpq.PQcmdStatus(self.pgresult_ptr)
if rv is not NULL:
return rv
else:
@property
def command_tuples(self) -> Optional[int]:
- cdef char *rv = impl.PQcmdTuples(self.pgresult_ptr)
+ cdef char *rv = libpq.PQcmdTuples(self.pgresult_ptr)
if rv is NULL:
return None
cdef bytes brv = rv
@property
def oid_value(self) -> int:
- return impl.PQoidValue(self.pgresult_ptr)
+ return libpq.PQoidValue(self.pgresult_ptr)
def set_attributes(self, descriptions: List[PGresAttDesc]):
cdef int num = len(descriptions)
- cdef impl.PGresAttDesc *attrs = <impl.PGresAttDesc *>PyMem_Malloc(
- num * sizeof(impl.PGresAttDesc))
+ cdef libpq.PGresAttDesc *attrs = <libpq.PGresAttDesc *>PyMem_Malloc(
+ num * sizeof(libpq.PGresAttDesc))
for i in range(num):
descr = descriptions[i]
attrs[i].typlen = descr.typlen
attrs[i].atttypmod = descr.atttypmod
- cdef int res = impl.PQsetResultAttrs(self.pgresult_ptr, num, attrs);
+ cdef int res = libpq.PQsetResultAttrs(self.pgresult_ptr, num, attrs);
PyMem_Free(attrs)
if (res == 0):
raise PQerror("PQsetResultAttrs failed")