Mostly to work around the obsolete libpq version available for aarch64.
import sys
from ctypes import Structure, CFUNCTYPE, POINTER
from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
-from typing import List, Tuple
+from typing import List, Optional, Tuple
from psycopg.errors import NotSupportedError
_PQhostaddr.restype = c_char_p
-def PQhostaddr(pgconn: type) -> bytes:
- if _PQhostaddr:
- return _PQhostaddr(pgconn)
- else:
+def PQhostaddr(pgconn: PGconn_struct) -> bytes:
+ if not _PQhostaddr:
raise NotSupportedError(
f"PQhostaddr requires libpq from PostgreSQL 12,"
f" {libpq_version} available instead"
)
+ return _PQhostaddr(pgconn)
+
PQport = pq.PQport
PQport.argtypes = [PGconn_ptr]
PQfreemem.argtypes = [c_void_p]
PQfreemem.restype = None
-PQencryptPasswordConn = pq.PQencryptPasswordConn
-PQencryptPasswordConn.argtypes = [PGconn_ptr, c_char_p, c_char_p, c_char_p]
-PQencryptPasswordConn.restype = POINTER(c_char)
+if libpq_version >= 100000:
+ _PQencryptPasswordConn = pq.PQencryptPasswordConn
+ _PQencryptPasswordConn.argtypes = [
+ PGconn_ptr,
+ c_char_p,
+ c_char_p,
+ c_char_p,
+ ]
+ _PQencryptPasswordConn.restype = POINTER(c_char)
+
+
+def PQencryptPasswordConn(
+ pgconn: PGconn_struct, passwd: bytes, user: bytes, algorithm: bytes
+) -> Optional[bytes]:
+ if not _PQencryptPasswordConn:
+ raise NotSupportedError(
+ f"PQencryptPasswordConn requires libpq from PostgreSQL 10,"
+ f" {libpq_version} available instead"
+ )
+
+ return _PQencryptPasswordConn(pgconn, passwd, user, algorithm)
+
PQmakeEmptyPGresult = pq.PQmakeEmptyPGresult
PQmakeEmptyPGresult.argtypes = [PGconn_ptr, c_int]
def generate_stub() -> None:
import re
- from ctypes import _CFuncPtr
+ from ctypes import _CFuncPtr # type: ignore
def type2str(fname, narg, t):
if t is None:
# Copyright (C) 2020-2021 The Psycopg Team
+cdef extern from "pg_config.h":
+
+ int PG_VERSION_NUM
+
+
cdef extern from "libpq-fe.h":
# structures and types
char *PQuser(const PGconn *conn)
char *PQpass(const PGconn *conn)
char *PQhost(const PGconn *conn)
- # char *PQhostaddr(const PGconn *conn) TODO: conditional, only libpq>=12
+ char *PQhostaddr(const PGconn *conn)
char *PQport(const PGconn *conn)
char *PQtty(const PGconn *conn)
char *PQoptions(const PGconn *conn)
# 33.18. SSL Support
void PQinitOpenSSL(int do_ssl, int do_crypto)
+
+
+cdef extern from *:
+ """
+/* Hack to allow the use of old libpq versions */
+#if PG_VERSION_NUM < 100000
+#define PQencryptPasswordConn(conn, passwd, user, algorithm) NULL
+#endif
+
+#if PG_VERSION_NUM < 120000
+#define PQhostaddr(conn) NULL
+#endif
+"""
@property
def hostaddr(self) -> bytes:
- # Available only from PG 12. Use the dynamic ctypes implementation
- from psycopg.pq import _pq_ctypes
+ if libpq.PG_VERSION_NUM < 120000:
+ raise e.NotSupportedError(
+ f"PQhostaddr requires libpq from PostgreSQL 12,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
_ensure_pgconn(self)
- return _pq_ctypes.PQhostaddr(
- ctypes.cast(self.pgconn_ptr, _pq_ctypes.PGconn_ptr))
+ cdef char *rv = libpq.PQhostaddr(self._pgconn_ptr)
+ assert rv is not NULL
+ return rv
@property
def port(self) -> bytes:
def encrypt_password(
self, const char *passwd, const char *user, algorithm = None
) -> bytes:
+ if libpq.PG_VERSION_NUM < 100000:
+ raise e.NotSupportedError(
+ f"PQencryptPasswordConn requires libpq from PostgreSQL 10,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+
cdef char *out
cdef const char *calgo = NULL
if algorithm:
assert "hello error" in rec.message
+@pytest.mark.libpq(">= 10")
def test_encrypt_password(pgconn):
enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")
assert enc == b"md594839d658c28a357126f105b9cb14cfc"
+@pytest.mark.libpq(">= 10")
def test_encrypt_password_scram(pgconn):
enc = pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256")
assert enc.startswith(b"SCRAM-SHA-256$")
+@pytest.mark.libpq(">= 10")
def test_encrypt_password_badalgo(pgconn):
with pytest.raises(psycopg.OperationalError):
assert pgconn.encrypt_password(b"psycopg2", b"ashesh", b"wat")
+@pytest.mark.libpq(">= 10")
def test_encrypt_password_query(pgconn):
res = pgconn.exec_(b"set password_encryption to 'md5'")
assert res.status == pq.ExecStatus.COMMAND_OK
assert enc.startswith(b"SCRAM-SHA-256$")
+@pytest.mark.libpq(">= 10")
def test_encrypt_password_closed(pgconn):
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
assert pgconn.encrypt_password(b"psycopg2", b"ashesh")
+@pytest.mark.libpq("< 10")
+def test_encrypt_password_not_supported(pgconn):
+ # it might even be supported, but not worth the lifetime
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.encrypt_password(b"psycopg2", b"ashesh", b"md5")
+
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.encrypt_password(b"psycopg2", b"ashesh", b"scram-sha-256")
+
+
def test_str(pgconn, dsn):
assert "[IDLE]" in str(pgconn)
pgconn.finish()