class Column(Sequence[Any]):
-
__module__ = "psycopg"
def __init__(self, cursor: "BaseCursor[Any, Any]", index: int):
class BasePipeline:
-
command_queue: Deque[PipelineCommand]
result_queue: Deque[PendingResult]
_is_supported: Optional[bool] = None
class Transformer(Protocol):
-
types: Optional[Tuple[int, ...]]
formats: Optional[List[pq.Format]]
cursor_factory: Optional[Type[AsyncCursor[Row]]] = None,
**kwargs: Any,
) -> "AsyncConnection[Any]":
-
if sys.platform == "win32":
loop = asyncio.get_running_loop()
if isinstance(loop, asyncio.ProactorEventLoop):
class TextFormatter(Formatter):
-
format = TEXT
def __init__(self, transformer: Transformer, encoding: str = "utf-8"):
class BinaryFormatter(Formatter):
-
format = BINARY
def __init__(self, transformer: Transformer):
class _CrdbConnectionMixin:
-
_adapters: Optional[AdaptersMap]
pgconn: "PGconn"
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
with self._conn.lock:
-
try:
self._conn.wait(self._stream_send_gen(query, params, binary=binary))
first = True
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
async with self._conn.lock:
-
try:
await self._conn.wait(
self._stream_send_gen(query, params, binary=binary)
def register_default_types(types: TypesRegistry) -> None:
-
from .types.range import RangeInfo
from .types.multirange import MultirangeInfo
def register_default_adapters(context: AdaptContext) -> None:
-
from .types import array, bool, composite, datetime, enum, json, multirange
from .types import net, none, numeric, range, string, uuid
reprs = []
for arg in args:
reprs.append(f"{arg!r}")
- for (k, v) in kwargs.items():
+ for k, v in kwargs.items():
reprs.append(f"{k}={v!r}")
logger.info("PGconn.%s(%s)", f.__name__, ", ".join(reprs))
class PGconn(Protocol):
-
notice_handler: Optional[Callable[["PGresult"], None]]
notify_handler: Optional[Callable[["PGnotify"], None]]
yield from self._conn._exec_command(query)
def _make_declare_statement(self, query: Query) -> sql.Composed:
-
if isinstance(query, bytes):
query = query.decode(self._encoding)
if not isinstance(query, sql.Composable):
class ListDumper(BaseListDumper):
-
delimiter = b","
def get_key(self, obj: List[Any], format: PyFormat) -> DumperKey:
class ListBinaryDumper(BaseListDumper):
-
format = pq.Format.BINARY
def get_key(self, obj: List[Any], format: PyFormat) -> DumperKey:
class ArrayLoader(RecursiveLoader):
-
delimiter = b","
base_oid: int
class ArrayBinaryLoader(RecursiveLoader):
-
format = pq.Format.BINARY
def load(self, data: Buffer) -> List[Any]:
class BoolDumper(Dumper):
-
oid = _oids.BOOL_OID
def dump(self, obj: bool) -> bytes:
class BoolBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.BOOL_OID
class BoolBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> bool:
class TupleDumper(SequenceDumper):
-
# Should be this, but it doesn't work
# oid = _oids.RECORD_OID
class TupleBinaryDumper(RecursiveDumper):
-
format = pq.Format.BINARY
# Subclasses must set an info
class CompositeLoader(RecordLoader):
-
factory: Callable[..., Any]
fields_types: List[int]
_types_set = False
class CompositeBinaryLoader(RecordBinaryLoader):
-
format = pq.Format.BINARY
factory: Callable[..., Any]
class DateDumper(Dumper):
-
oid = _oids.DATE_OID
def dump(self, obj: date) -> bytes:
class DateBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.DATE_OID
class TimeDumper(_BaseTimeTextDumper):
-
oid = _oids.TIME_OID
def upgrade(self, obj: time, format: PyFormat) -> Dumper:
class TimeTzDumper(_BaseTimeTextDumper):
-
oid = _oids.TIMETZ_OID
class TimeBinaryDumper(_BaseTimeDumper):
-
format = Format.BINARY
oid = _oids.TIME_OID
class TimeTzBinaryDumper(_BaseTimeDumper):
-
format = Format.BINARY
oid = _oids.TIMETZ_OID
class DatetimeDumper(_BaseDatetimeTextDumper):
-
oid = _oids.TIMESTAMPTZ_OID
def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
class DatetimeNoTzDumper(_BaseDatetimeTextDumper):
-
oid = _oids.TIMESTAMP_OID
class DatetimeBinaryDumper(_BaseDatetimeDumper):
-
format = Format.BINARY
oid = _oids.TIMESTAMPTZ_OID
class DatetimeNoTzBinaryDumper(_BaseDatetimeDumper):
-
format = Format.BINARY
oid = _oids.TIMESTAMP_OID
class TimedeltaDumper(Dumper):
-
oid = _oids.INTERVAL_OID
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
class TimedeltaBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.INTERVAL_OID
class DateLoader(Loader):
-
_ORDER_YMD = 0
_ORDER_DMY = 1
_ORDER_MDY = 2
class DateBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> date:
class TimeLoader(Loader):
-
_re_format = re.compile(rb"^(\d+):(\d+):(\d+)(?:\.(\d+))?")
def load(self, data: Buffer) -> time:
class TimeBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> time:
class TimetzLoader(Loader):
-
_re_format = re.compile(
rb"""(?ix)
^
class TimetzBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> time:
class TimestampLoader(Loader):
-
_re_format = re.compile(
rb"""(?ix)
^
class TimestampBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> datetime:
class TimestamptzLoader(Loader):
-
_re_format = re.compile(
rb"""(?ix)
^
class TimestamptzBinaryLoader(Loader):
-
format = Format.BINARY
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
class IntervalLoader(Loader):
-
_re_interval = re.compile(
rb"""
(?: ([-+]?\d+) \s+ years? \s* )? # Years
class IntervalBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> timedelta:
tokens.append('"')
for k, v in obj.items():
-
if not isinstance(k, str):
raise e.DataError("hstore keys can only be strings")
add_token(k)
class _JsonDumper(Dumper):
-
# The globally used JSON dumps() function. It can be changed globally (by
# set_json_dumps) or by a subclass.
_dumps: JsonDumpsFunction = json.dumps
class JsonDumper(_JsonDumper):
-
oid = _oids.JSON_OID
class JsonBinaryDumper(_JsonDumper):
-
format = Format.BINARY
oid = _oids.JSON_OID
class JsonbDumper(_JsonDumper):
-
oid = _oids.JSONB_OID
class JsonbBinaryDumper(_JsonDumper):
-
format = Format.BINARY
oid = _oids.JSONB_OID
class _JsonLoader(Loader):
-
# The globally used JSON loads() function. It can be changed globally (by
# set_json_loads) or by a subclass.
_loads: JsonLoadsFunction = json.loads
class JsonbBinaryLoader(_JsonLoader):
-
format = Format.BINARY
def load(self, data: Buffer) -> Any:
class MultirangeBinaryDumper(BaseMultirangeDumper):
-
format = Format.BINARY
def dump(self, obj: Multirange[Any]) -> Buffer:
class BaseMultirangeLoader(RecursiveLoader, Generic[T]):
-
subtype_oid: int
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
class MultirangeBinaryLoader(BaseMultirangeLoader[T]):
-
format = Format.BINARY
def load(self, data: Buffer) -> Multirange[T]:
class InterfaceDumper(Dumper):
-
oid = _oids.INET_OID
def dump(self, obj: Interface) -> bytes:
class NetworkDumper(Dumper):
-
oid = _oids.CIDR_OID
def dump(self, obj: Network) -> bytes:
class NetworkBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.CIDR_OID
class InetBinaryLoader(_LazyIpaddressLoader):
-
format = Format.BINARY
def load(self, data: Buffer) -> Union[Address, Interface]:
class CidrBinaryLoader(_LazyIpaddressLoader):
-
format = Format.BINARY
def load(self, data: Buffer) -> Network:
class _SpecialValuesDumper(Dumper):
-
_special: Dict[bytes, bytes] = {}
def dump(self, obj: Any) -> bytes:
class FloatDumper(_SpecialValuesDumper):
-
oid = _oids.FLOAT8_OID
_special = {
class FloatBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.FLOAT8_OID
class Float4BinaryDumper(FloatBinaryDumper):
-
oid = _oids.FLOAT4_OID
def dump(self, obj: float) -> bytes:
class DecimalDumper(_SpecialValuesDumper):
-
oid = _oids.NUMERIC_OID
def dump(self, obj: Decimal) -> bytes:
class Int2BinaryDumper(Int2Dumper):
-
format = Format.BINARY
def dump(self, obj: int) -> bytes:
class Int4BinaryDumper(Int4Dumper):
-
format = Format.BINARY
def dump(self, obj: int) -> bytes:
class Int8BinaryDumper(Int8Dumper):
-
format = Format.BINARY
def dump(self, obj: int) -> bytes:
class IntNumericBinaryDumper(IntNumericDumper):
-
format = Format.BINARY
def dump(self, obj: int) -> Buffer:
class OidBinaryDumper(OidDumper):
-
format = Format.BINARY
def dump(self, obj: int) -> bytes:
class IntBinaryDumper(IntDumper):
-
format = Format.BINARY
_int2_dumper = Int2BinaryDumper(Int2)
class Int2BinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> int:
class Int4BinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> int:
class Int8BinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> int:
class OidBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> int:
class Float4BinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> float:
class Float8BinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> float:
class NumericBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> Decimal:
class DecimalBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.NUMERIC_OID
class NumericBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.NUMERIC_OID
class RangeBinaryDumper(BaseRangeDumper):
-
format = Format.BINARY
def dump(self, obj: Range[Any]) -> Buffer:
class RangeBinaryLoader(BaseRangeLoader[T]):
-
format = Format.BINARY
def load(self, data: Buffer) -> Range[T]:
class StrBinaryDumper(_StrBinaryDumper):
-
oid = _oids.TEXT_OID
class StrBinaryDumperVarchar(_StrBinaryDumper):
-
oid = _oids.VARCHAR_OID
class StrBinaryDumperName(_StrBinaryDumper):
-
oid = _oids.NAME_OID
class StrDumperVarchar(_StrDumper):
-
oid = _oids.VARCHAR_OID
class StrDumperName(_StrDumper):
-
oid = _oids.NAME_OID
class TextBinaryLoader(TextLoader):
-
format = Format.BINARY
class BytesDumper(Dumper):
-
oid = _oids.BYTEA_OID
_qprefix = b""
class BytesBinaryDumper(Dumper):
-
format = Format.BINARY
oid = _oids.BYTEA_OID
class ByteaLoader(Loader):
-
_escaping: "EscapingProto"
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
class ByteaBinaryLoader(Loader):
-
format = Format.BINARY
def load(self, data: Buffer) -> Buffer:
class UUIDDumper(Dumper):
-
oid = _oids.UUID_OID
def dump(self, obj: "uuid.UUID") -> bytes:
class UUIDBinaryDumper(UUIDDumper):
-
format = Format.BINARY
def dump(self, obj: "uuid.UUID") -> bytes:
class UUIDBinaryLoader(UUIDLoader):
-
format = Format.BINARY
def load(self, data: Buffer) -> "uuid.UUID":
pytest-cov >= 3.0
pytest-randomly >= 3.5
dev =
- black >= 22.3.0
+ black >= 23.1.0
dnspython >= 2.1
flake8 >= 4.0
mypy >= 0.990
class BasePool(Generic[ConnectionType]):
-
# Used to generate pool names
_num_pool = 0
connections: Sequence[Connection[Any]] = (),
timeout: float = 0.0,
) -> None:
-
# Stop the scheduler
self._sched.enter(0, None)
pytest-randomly == 3.5.0
# From the 'dev' extra
-black == 22.3.0
+black == 23.1.0
dnspython == 2.1.0
flake8 == 4.0.0
types-setuptools == 57.4.0
with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
with p.connection() as conn:
-
# Queue the worker so it will take the same connection a second time
# instead of making a new one.
t = Thread(target=worker)
with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
with p.connection() as conn:
-
t = Thread(target=worker)
t.start()
ensure_waiting(p)
with NullConnectionPool(dsn, max_size=1, reset=reset) as p:
with p.connection() as conn:
-
t = Thread(target=worker)
t.start()
ensure_waiting(p)
async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
-
# Queue the worker so it will take the same connection a second time
# instead of making a new one.
t = create_task(worker())
async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
-
t = create_task(worker())
await ensure_waiting(p)
async with AsyncNullConnectionPool(dsn, max_size=1, reset=reset) as p:
async with p.connection() as conn:
-
t = create_task(worker())
await ensure_waiting(p)
@pytest.mark.slow
@pytest.mark.timing
def test_shrink(dsn, monkeypatch):
-
from psycopg_pool.pool import ShrinkPool
results: List[Tuple[int, int]] = []
@pytest.mark.slow
@pytest.mark.timing
async def test_shrink(dsn, monkeypatch):
-
from psycopg_pool.pool_async import ShrinkPool
results: List[Tuple[int, int]] = []
def main() -> None:
-
args = parse_cmdline()
ids[:] = range(args.ntests)
def test_dumper_protocol(conn):
-
# This class doesn't inherit from adapt.Dumper but passes a mypy check
from .adapters_example import MyStrDumper
def test_loader_protocol(conn):
-
# This class doesn't inherit from adapt.Loader but passes a mypy check
from .adapters_example import MyTextLoader
@pytest.mark.slow
@pytest.mark.skipif(_psycopg is None, reason="C module test")
def test_optimised_adapters():
-
# All the optimised adapters available
c_adapters = {}
for n in dir(_psycopg):
async def test_srv_async(conninfo, want, env, afake_srv, setpgenv):
setpgenv(env)
params = conninfo_to_dict(conninfo)
- params = await (
- psycopg._dns.resolve_srv_async(params) # type: ignore[attr-defined]
- )
+ params = await psycopg._dns.resolve_srv_async(params) # type: ignore[attr-defined]
assert conninfo_to_dict(want) == params
# check the values returned
assert len(okvals) == len(xids)
- for (xid, (gid, prepared, owner, database)) in zip(xids, okvals):
+ for xid, (gid, prepared, owner, database) in zip(xids, okvals):
assert xid.gtrid == gid
assert xid.prepared == prepared
assert xid.owner == owner
# check the values returned
assert len(okvals) == len(xids)
- for (xid, (gid, prepared, owner, database)) in zip(xids, okvals):
+ for xid, (gid, prepared, owner, database) in zip(xids, okvals):
assert xid.gtrid == gid
assert xid.prepared == prepared
assert xid.owner == owner
@_status
@_info_cls
def test_fetch_not_found(conn, name, status, info_cls, monkeypatch):
-
if TypeInfo._has_to_regtype_function(conn):
exit_orig = psycopg.Transaction.__exit__
@_status
@_info_cls
async def test_fetch_not_found_async(aconn, name, status, info_cls, monkeypatch):
-
if TypeInfo._has_to_regtype_function(aconn):
exit_orig = psycopg.AsyncTransaction.__aexit__
self.coords = (x1, y1, x2, y2)
class BoxDumper(Dumper):
-
format = pq.Format.TEXT
oid = psycopg.postgres.types["box"].oid
@pytest.mark.parametrize("val", [True, False])
def test_quote_bool(conn, val):
-
tx = Transformer()
assert tx.get_dumper(val, PyFormat.TEXT).quote(val) == str(val).lower().encode(
"ascii"
def test_quote_none(conn):
-
tx = Transformer()
assert tx.get_dumper(None, PyFormat.TEXT).quote(None) == b"NULL"
def test_quote_none(conn):
-
tx = Transformer()
assert tx.get_dumper(None, PyFormat.TEXT).quote(None) == b"NULL"
def get_py_oids(conn: Connection) -> List[str]:
lines = []
- for (typname, oid) in conn.execute(
+ for typname, oid in conn.execute(
"""
select typname, oid
from pg_type
# Note: "record" is a pseudotype but still a useful one to have.
# "pg_lsn" is a documented public type and useful in streaming replication
lines = []
- for (typname, oid, typarray, regtype, typdelim) in conn.execute(
+ for typname, oid, typarray, regtype, typdelim in conn.execute(
"""
select typname, oid, typarray,
-- CRDB might have quotes in the regtype representation
def get_py_ranges(conn: Connection) -> List[str]:
lines = []
- for (typname, oid, typarray, rngsubtype) in conn.execute(
+ for typname, oid, typarray, rngsubtype in conn.execute(
"""
select typname, oid, typarray, rngsubtype
from
def get_py_multiranges(conn: Connection) -> List[str]:
lines = []
- for (typname, oid, typarray, rngtypid, rngsubtype) in conn.execute(
+ for typname, oid, typarray, rngtypid, rngsubtype in conn.execute(
"""
select typname, oid, typarray, rngtypid, rngsubtype
from
def get_cython_oids(conn: Connection) -> List[str]:
lines = []
- for (typname, oid) in conn.execute(
+ for typname, oid in conn.execute(
"""
select typname, oid
from pg_type