PQfreemem.argtypes = [c_void_p]
PQfreemem.restype = None
+PQencryptPasswordConn = pq.PQencryptPasswordConn
+PQencryptPasswordConn.argtypes = [PGconn_ptr, c_char_p, c_char_p, c_char_p]
+PQencryptPasswordConn.restype = POINTER(c_char)
+
PQmakeEmptyPGresult = pq.PQmakeEmptyPGresult
PQmakeEmptyPGresult.argtypes = [PGconn_ptr, c_int]
PQmakeEmptyPGresult.restype = PGresult_ptr
arg2: int,
arg3: Array[PGresAttDesc_struct], # type: ignore
) -> int: ...
+def PQencryptPasswordConn(
+ arg1: Optional[PGconn_struct],
+ arg2: bytes,
+ arg3: bytes,
+ arg4: Optional[bytes],
+) -> bytes: ...
# fmt: off
# autogenerated: start
def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
...
+ def encrypt_password(
+ self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
+ ) -> bytes:
+ ...
+
def make_empty_result(self, exec_status: int) -> "PGresult":
...
else:
return nbytes, memoryview(b"")
+ def encrypt_password(
+ self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
+ ) -> bytes:
+ out = impl.PQencryptPasswordConn(
+ self._pgconn_ptr, passwd, user, algorithm
+ )
+ if not out:
+ raise e.OperationalError(
+ f"password encryption failed: {error_message(self)}"
+ )
+
+ rv = string_at(out)
+ impl.PQfreemem(out)
+ return rv
+
def make_empty_result(self, exec_status: int) -> "PGresult":
rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
if not rv:
# 33.11. Miscellaneous Functions
void PQfreemem(void *ptr) nogil
void PQconninfoFree(PQconninfoOption *connOptions)
+ char *PQencryptPasswordConn(
+ PGconn *conn, const char *passwd, const char *user, const char *algorithm);
PGresult *PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
int PQsetResultAttrs(PGresult *res, int numAttributes, PGresAttDesc *attDescs)
int PQlibVersion()
else:
return nbytes, b"" # won't parse it, doesn't really be memoryview
+ def encrypt_password(
+ self, const char *passwd, const char *user, algorithm = None
+ ) -> bytes:
+ cdef char *out
+ cdef const char *calgo = NULL
+ if algorithm:
+ calgo = algorithm
+ out = libpq.PQencryptPasswordConn(self._pgconn_ptr, passwd, user, calgo)
+ if not out:
+ raise e.OperationalError(
+ f"password encryption failed: {error_message(self)}"
+ )
+
+ rv = bytes(out)
+ libpq.PQfreemem(out)
+ return rv
+
def make_empty_result(self, int exec_status) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQmakeEmptyPGresult(
self._pgconn_ptr, <libpq.ExecStatusType>exec_status)
assert "hello error" in rec.message
+def test_encrypt_password(pgconn):
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")
+ assert enc == b"md594839d658c28a357126f105b9cb14cfc"
+
+
+def test_encrypt_password_scram(pgconn):
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256")
+ assert enc.startswith(b"SCRAM-SHA-256$")
+
+
+def test_encrypt_password_badalgo(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ assert pgconn.encrypt_password(b"psycopg2", b"ashesh", b"wat")
+
+
+def test_encrypt_password_query(pgconn):
+ res = pgconn.exec_(b"set password_encryption to 'md5'")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh")
+ assert enc == b"md594839d658c28a357126f105b9cb14cfc"
+
+ res = pgconn.exec_(b"set password_encryption to 'scram-sha-256'")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+ enc = pgconn.encrypt_password(b"psycopg2", b"ashesh")
+ assert enc.startswith(b"SCRAM-SHA-256$")
+
+
+def test_encrypt_password_closed(pgconn):
+ pgconn.finish()
+ with pytest.raises(psycopg.OperationalError):
+ assert pgconn.encrypt_password(b"psycopg2", b"ashesh")
+
+
def test_str(pgconn, dsn):
assert "[IDLE]" in str(pgconn)
pgconn.finish()