.. automethod:: connect
- Connection parameters can be passed either as a `conninfo string`__ (a
- ``postgresql://`` url or a list of ``key=value pairs``) or as keywords.
- Keyword parameters override the ones specified in the connection string.
+ :param conninfo: The `connection string`__ (a ``postgresql://`` url or
+ a list of ``key=value pairs``) to specify where and
+ how to connect.
+ :param kwargs: Further parameters specifying the connection string.
+ They override the ones specified in *conninfo*.
+ :param autocommit: If `!True` don't start transactions automatically.
+ See `transactions` for details.
+ :param row_factory: The row factory specifying what type of records
+ to create fetching data (default:
+ `~psycopg3.rows.tuple_row()`). See
+ :ref:`row-factories` for details.
.. __: https://www.postgresql.org/docs/current/libpq-connect.html
#LIBPQ-CONNSTRING
.. autoattribute:: closed
:annotation: bool
- .. automethod:: cursor
- Calling the method without a *name* creates a client-side cursor,
- specifying a *name* crates a server-side cursor. See
- :ref:`cursor-types` for the details.
+ .. method:: cursor(*, binary: bool = False, row_factory: Optional[RowFactory] = None) -> Cursor
+ .. method:: cursor(name: str, *, binary: bool = False, row_factory: Optional[RowFactory] = None) -> ServerCursor
+ :noindex:
+
+ Return a new cursor to send commands and queries to the connection.
+
+ :param name: If not specified create a client-side cursor, if
+ specified create a server-side cursor. See
+ :ref:`cursor-types` for details.
+ :param binary: If `!True` return binary values from the database. All
+ the types returned by the query must have a binary
+ loader. See :ref:`binary-data` for details.
+ :param row_factory: If specified override the `row_factory` set on the
+ connection. See :ref:`row-factories` for details.
.. note:: You can use :ref:`with conn.cursor(): ...<usage>`
to close the cursor automatically when the block is exited.
.. autoattribute:: row_factory
:annotation: RowFactory
- Writable attribute to control how result's rows are formed.
+ Writable attribute to control how result rows are formed.
See :ref:`row-factories` for details.
.. rubric:: Transaction management methods
automatically when the block is exited, but be careful about
the async quirkness: see :ref:`async-with` for details.
- .. automethod:: cursor
+ .. method:: cursor(*, binary: bool = False, row_factory: Optional[RowFactory] = None) -> AsyncCursor
+ .. method:: cursor(name: str, *, binary: bool = False, row_factory: Optional[RowFactory] = None) -> AsyncServerCursor
+ :noindex:
.. note:: You can use ``async with conn.cursor() as cur: ...`` to
close the cursor automatically when the block is exited.
Available row factories
-----------------------
-Module `psycopg3.rows` contains available row factories:
+The module `psycopg3.rows` provides the implementation for a few row factories:
.. currentmodule:: psycopg3.rows
+.. autofunction:: tuple_row
.. autofunction:: dict_row
.. autofunction:: namedtuple_row
from . import encodings
from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
from .sql import Composable
+from .rows import tuple_row
from .proto import PQGen, PQGenConn, RV, RowFactory, Query, Params
from .proto import AdaptContext, ConnectionType
from .cursor import Cursor, AsyncCursor
NotifyHandler = Callable[[Notify], None]
-_null_row_factory: RowFactory = object() # type: ignore[assignment]
-
-
class BaseConnection(AdaptContext):
"""
Base class for different types of connections.
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- row_factory: Optional[RowFactory] = None
+ row_factory: RowFactory = tuple_row
def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn # TODO: document this
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory,
**kwargs: Any,
) -> PQGenConn[ConnectionType]:
"""Generator to connect to the database and create a new instance."""
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory = tuple_row,
**kwargs: Any,
) -> "Connection":
"""
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = _null_row_factory,
+ row_factory: Optional[RowFactory] = None,
) -> Union[Cursor, ServerCursor]:
"""
Return a new cursor to send commands and queries to the connection.
"""
format = Format.BINARY if binary else Format.TEXT
- if row_factory is _null_row_factory:
+ if not row_factory:
row_factory = self.row_factory
if name:
return ServerCursor(
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory = tuple_row,
**kwargs: Any,
) -> "AsyncConnection":
return await cls._wait_conn(
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = _null_row_factory,
+ row_factory: Optional[RowFactory] = None,
) -> Union[AsyncCursor, AsyncServerCursor]:
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
"""
format = Format.BINARY if binary else Format.TEXT
- if row_factory is _null_row_factory:
+ if not row_factory:
row_factory = self.row_factory
if name:
return AsyncServerCursor(
from .pq import ExecStatus, Format
from .copy import Copy, AsyncCopy
+from .rows import tuple_row
from .proto import ConnectionType, Query, Params, PQGen
from .proto import Row, RowFactory
from ._column import Column
connection: ConnectionType,
*,
format: Format = Format.TEXT,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory = tuple_row,
):
self._conn = connection
self.format = format
if self._iresult < len(self._results):
self.pgresult = self._results[self._iresult]
self._tx.set_pgresult(self._results[self._iresult])
- if self._row_factory:
- self._tx.make_row = self._row_factory(self)
+ self._tx.make_row = self._row_factory(self)
self._pos = 0
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
elif res.status == ExecStatus.SINGLE_TUPLE:
self.pgresult = res
self._tx.set_pgresult(res, set_loaders=first)
- if first and self._row_factory:
+ if first:
self._tx.make_row = self._row_factory(self)
return res
self._results = list(results)
self.pgresult = results[0]
self._tx.set_pgresult(results[0])
- if self._row_factory:
- self._tx.make_row = self._row_factory(self)
+ self._tx.make_row = self._row_factory(self)
nrows = self.pgresult.command_tuples
if nrows is not None:
if self._rowcount < 0:
class RowFactory(Protocol):
- def __call__(self, __cursor: "BaseCursor[ConnectionType]") -> RowMaker:
+ def __call__(
+ self, __cursor: "BaseCursor[ConnectionType]"
+ ) -> Optional[RowMaker]:
...
from . import pq
from . import sql
from . import errors as e
+from .rows import tuple_row
from .cursor import BaseCursor, execute
from .proto import ConnectionType, Query, Params, PQGen, Row, RowFactory
name: str,
*,
format: pq.Format = pq.Format.TEXT,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory = tuple_row,
):
super().__init__(connection, format=format, row_factory=row_factory)
- self._helper: ServerCursorHelper["Connection"] = ServerCursorHelper(
- name
- )
+ self._helper: ServerCursorHelper["Connection"]
+ self._helper = ServerCursorHelper(name)
self.itersize = DEFAULT_ITERSIZE
def __del__(self) -> None:
name: str,
*,
format: pq.Format = pq.Format.TEXT,
- row_factory: Optional[RowFactory] = None,
+ row_factory: RowFactory = tuple_row,
):
super().__init__(connection, format=format, row_factory=row_factory)
self._helper: ServerCursorHelper["AsyncConnection"]
import psycopg3
from psycopg3 import encodings
from psycopg3 import Connection, Notify
+from psycopg3.rows import tuple_row
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
from .test_cursor import my_row_factory
def test_row_factory(dsn):
+ conn = Connection.connect(dsn)
+ assert conn.row_factory is tuple_row
+
conn = Connection.connect(dsn, row_factory=my_row_factory)
- assert conn.row_factory
+ assert conn.row_factory is my_row_factory
cur = conn.execute("select 'a' as ve")
assert cur.fetchone() == ["Ave"]
cur.execute("select 1, 1, 2")
assert cur.fetchall() == [{1, 2}]
- with conn.cursor(row_factory=None) as cur:
+ with conn.cursor(row_factory=tuple_row) as cur:
cur.execute("select 1, 1, 2")
assert cur.fetchall() == [(1, 1, 2)]
- conn.row_factory = None
+ conn.row_factory = tuple_row
cur = conn.execute("select 'vale'")
assert cur.fetchone() == ("vale",)
import psycopg3
from psycopg3 import encodings
from psycopg3 import AsyncConnection, Notify
+from psycopg3.rows import tuple_row
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
from .test_cursor import my_row_factory
async def test_row_factory(dsn):
+ conn = await AsyncConnection.connect(dsn)
+ assert conn.row_factory is tuple_row
+
conn = await AsyncConnection.connect(dsn, row_factory=my_row_factory)
- assert conn.row_factory
+ assert conn.row_factory is my_row_factory
cur = await conn.execute("select 'a' as ve")
assert await cur.fetchone() == ["Ave"]
await cur.execute("select 1, 1, 2")
assert await cur.fetchall() == [{1, 2}]
- async with conn.cursor(row_factory=None) as cur:
+ async with conn.cursor(row_factory=tuple_row) as cur:
await cur.execute("select 1, 1, 2")
assert await cur.fetchall() == [(1, 1, 2)]
- conn.row_factory = None
+ conn.row_factory = tuple_row
cur = await conn.execute("select 'vale'")
assert await cur.fetchone() == ("vale",)