>>> encrypted = conn.pgconn.encrypt_password(password.encode(enc), rolename.encode(enc))
b'SCRAM-SHA-256$4096:...
+ .. .. automethod:: change_password FIXME: needs libpq 17's docs
+
.. automethod:: trace
.. automethod:: set_trace_flags
.. automethod:: untrace
termination (:ticket:`#754`).
- Add support for libpq function to retrieve results in chunks introduced in
libpq v17 (:ticket:`#793`).
+- Add support for libpq function to change role passwords introduced in
+ libpq v17 (:ticket:`#818`).
.. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
def encrypt_password(self, *args: Any) -> NoReturn:
self._raise()
+ def change_password(self, *args: Any) -> NoReturn:
+ self._raise()
+
def make_empty_result(self, *args: Any) -> NoReturn:
self._raise()
else:
PQencryptPasswordConn = not_supported_before("PQencryptPasswordConn", 100000)
+if libpq_version >= 170000:
+ PQchangePassword = pq.PQchangePassword
+ PQchangePassword.argtypes = [
+ PGconn_ptr,
+ c_char_p,
+ c_char_p,
+ ]
+ PQchangePassword.restype = PGresult_ptr
+else:
+ PQchangePassword = not_supported_before("PQchangePassword", 170000)
+
PQmakeEmptyPGresult = pq.PQmakeEmptyPGresult
PQmakeEmptyPGresult.argtypes = [PGconn_ptr, c_int]
PQmakeEmptyPGresult.restype = PGresult_ptr
def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
def PQuntrace(arg1: Optional[PGconn_struct]) -> None: ...
def PQfreemem(arg1: Any) -> None: ...
+def PQchangePassword(arg1: Optional[PGconn_struct], arg2: bytes, arg3: bytes) -> PGresult_struct: ...
def PQmakeEmptyPGresult(arg1: Optional[PGconn_struct], arg2: int) -> PGresult_struct: ...
def PQinitOpenSSL(arg1: int, arg2: int) -> None: ...
# autogenerated: end
self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
) -> bytes: ...
+ def change_password(self, user: bytes, passwd: bytes) -> None: ...
+
def make_empty_result(self, exec_status: int) -> "PGresult": ...
@property
impl.PQfreemem(out)
return rv
+ def change_password(self, user: bytes, passwd: bytes) -> None:
+ """
+ Change a PostgreSQL password.
+
+ :raises OperationalError: if the command to change password failed.
+
+ See :pq:`PQchangePassword` for details.
+ """
+ res = impl.PQchangePassword(self._pgconn_ptr, user, passwd)
+ if impl.PQresultStatus(res) != ExecStatus.COMMAND_OK:
+ raise e.OperationalError(
+ f"failed to change password change command: {error_message(self)}"
+ )
+
def make_empty_result(self, exec_status: int) -> "PGresult":
rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
if not rv:
void PQconninfoFree(PQconninfoOption *connOptions)
char *PQencryptPasswordConn(
PGconn *conn, const char *passwd, const char *user, const char *algorithm);
+ PGresult *PQchangePassword(PGconn *conn, const char *user, const char *passwd);
PGresult *PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
int PQsetResultAttrs(PGresult *res, int numAttributes, PGresAttDesc *attDescs)
int PQlibVersion()
#if PG_VERSION_NUM < 170000
typedef struct pg_cancel_conn PGcancelConn;
+#define PQchangePassword(conn, user, passwd) NULL
#define PQclosePrepared(conn, name) NULL
#define PQclosePortal(conn, name) NULL
#define PQsendClosePrepared(conn, name) 0
from psycopg.pq import Format as PqFormat, Trace, version_pretty
from psycopg.pq.misc import PGnotify, connection_summary
+from psycopg.pq._enums import ExecStatus
from psycopg_c.pq cimport PQBuffer
cdef object _check_supported(fname, int pgversion):
libpq.PQfreemem(out)
return rv
+ def change_password(
+ self, const char *user, const char *passwd
+ ) -> None:
+ _check_supported("PQchangePassword", 170000)
+
+ cdef libpq.PGresult *res
+ res = libpq.PQchangePassword(self._pgconn_ptr, user, passwd)
+ if libpq.PQresultStatus(res) != ExecStatus.COMMAND_OK:
+ raise e.OperationalError(
+ f"password encryption failed: {error_message(self)}"
+ )
+
def make_empty_result(self, int exec_status) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQmakeEmptyPGresult(
self._pgconn_ptr, <libpq.ExecStatusType>exec_status)
pgconn.trace(1)
+@pytest.mark.libpq(">= 17")
+def test_change_password_error(pgconn):
+ with pytest.raises(psycopg.OperationalError, match='role "ashesh" does not exist'):
+ pgconn.change_password(b"ashesh", b"psycopg")
+
+
+@pytest.fixture
+def role(pgconn: PGconn) -> Iterator[tuple[bytes, bytes]]:
+ user, passwd = "ashesh", "psycopg2"
+ r = pgconn.exec_(f"CREATE USER {user} LOGIN PASSWORD '{passwd}'".encode())
+ if r.status != pq.ExecStatus.COMMAND_OK:
+ pytest.skip(f"cannot create a PostgreSQL role: {r.error_message.decode()}")
+ yield user.encode(), passwd.encode()
+ r = pgconn.exec_(f"DROP USER {user}".encode())
+ if r.status != pq.ExecStatus.COMMAND_OK:
+ pytest.fail(f"failed to drop {user} role: {r.error_message.decode()}")
+
+
+@pytest.mark.libpq(">= 17")
+def test_change_password(pgconn, dsn, role):
+ user, passwd = role
+ conninfo = {e.keyword: e.val for e in pq.Conninfo.parse(dsn.encode()) if e.val}
+ conninfo |= {
+ b"dbname": b"postgres",
+ b"user": user,
+ b"password": passwd,
+ }
+ conn = pq.PGconn.connect(b" ".join(b"%s='%s'" % item for item in conninfo.items()))
+ assert conn.status == pq.ConnStatus.OK, conn.error_message
+
+ pgconn.change_password(user, b"psycopg")
+ conninfo[b"password"] = b"psycopg"
+ conn = pq.PGconn.connect(b" ".join(b"%s='%s'" % item for item in conninfo.items()))
+ assert conn.status == pq.ConnStatus.OK, conn.error_message
+
+
@pytest.mark.libpq(">= 10")
def test_encrypt_password(pgconn):
enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")