# Copyright (C) 2021 The Psycopg Team
import functools
-from typing import Any, Callable, Dict, NamedTuple, NoReturn, Sequence, Tuple
-from typing import TYPE_CHECKING, Type, TypeVar
+from typing import Any, Callable, Dict, List, Optional, NamedTuple, NoReturn
+from typing import TYPE_CHECKING, Sequence, Tuple, Type, TypeVar
from collections import namedtuple
from typing_extensions import TypeAlias
+from . import pq
from . import errors as e
from ._compat import Protocol
from ._encodings import _as_python_identifier
if TYPE_CHECKING:
from .cursor import BaseCursor, Cursor
from .cursor_async import AsyncCursor
+ from psycopg.pq.abc import PGresult
+
+TUPLES_OK = pq.ExecStatus.TUPLES_OK
+SINGLE_TUPLE = pq.ExecStatus.SINGLE_TUPLE
T = TypeVar("T", covariant=True)
The dictionary keys are taken from the column names of the returned columns.
"""
- desc = cursor.description
- if desc is None:
+ names = _get_names(cursor)
+ if names is None:
return no_result
- titles = [c.name for c in desc]
-
def dict_row_(values: Sequence[Any]) -> Dict[str, Any]:
- return dict(zip(titles, values))
+ # https://github.com/python/mypy/issues/2608
+ return dict(zip(names, values)) # type: ignore[arg-type]
return dict_row_
The field names are taken from the column names of the returned columns,
with some mangling to deal with invalid names.
"""
- desc = cursor.description
- if desc is None:
+ res = cursor.pgresult
+ if not res:
+ return no_result
+
+ nfields = _get_nfields(res)
+ if nfields is None:
return no_result
- nt = _make_nt(*(c.name for c in desc))
+ nt = _make_nt(cursor._encoding, *(res.fname(i) for i in range(nfields)))
return nt._make
@functools.lru_cache(512)
-def _make_nt(*key: str) -> Type[NamedTuple]:
- fields = []
- for s in key:
- fields.append(_as_python_identifier(s))
- return namedtuple("Row", fields) # type: ignore[return-value]
+def _make_nt(enc: str, *names: bytes) -> Type[NamedTuple]:
+ snames = tuple(_as_python_identifier(n.decode(enc)) for n in names)
+ return namedtuple("Row", snames) # type: ignore[return-value]
def class_row(cls: Type[T]) -> BaseRowFactory[T]:
:rtype: `!Callable[[Cursor],` `RowMaker`\[~T]]
"""
- def class_row_(cur: "BaseCursor[Any, Any]") -> "RowMaker[T]":
- desc = cur.description
- if desc is None:
+ def class_row_(cursor: "BaseCursor[Any, Any]") -> "RowMaker[T]":
+ names = _get_names(cursor)
+ if names is None:
return no_result
- names = [d.name for d in desc]
-
def class_row__(values: Sequence[Any]) -> T:
- return cls(**dict(zip(names, values)))
+ return cls(**dict(zip(names, values))) # type: ignore[arg-type]
return class_row__
returned by the query as keyword arguments.
"""
- def kwargs_row_(cur: "BaseCursor[Any, T]") -> "RowMaker[T]":
- desc = cur.description
- if desc is None:
+ def kwargs_row_(cursor: "BaseCursor[Any, T]") -> "RowMaker[T]":
+ names = _get_names(cursor)
+ if names is None:
return no_result
- names = [d.name for d in desc]
-
def kwargs_row__(values: Sequence[Any]) -> T:
- return func(**dict(zip(names, values)))
+ return func(**dict(zip(names, values))) # type: ignore[arg-type]
return kwargs_row__
resulting `!RowMaker` never should.
"""
raise e.InterfaceError("the cursor doesn't have a result")
+
+
+def _get_names(cursor: "BaseCursor[Any, Any]") -> Optional[List[str]]:
+ res = cursor.pgresult
+ if not res:
+ return None
+
+ nfields = _get_nfields(res)
+ if nfields is None:
+ return None
+
+ enc = cursor._encoding
+ return [
+ res.fname(i).decode(enc) for i in range(nfields) # type: ignore[union-attr]
+ ]
+
+
+def _get_nfields(res: "PGresult") -> Optional[int]:
+ """
+ Return the number of columns in a result, if it returns tuples else None
+
+ Take into account the special case of results with zero columns.
+ """
+ nfields = res.nfields
+
+ if nfields or res.status == TUPLES_OK or res.status == SINGLE_TUPLE:
+ return nfields
+ else:
+ return None