NotifyHandler = Callable[[Notify], None]
+_null_row_factory: RowFactory = object() # type: ignore[assignment]
+
+
class BaseConnection(AdaptContext):
"""
Base class for different types of connections.
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
+ row_factory: Optional[RowFactory] = None
+
def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn # TODO: document this
self._autocommit = False
conninfo: str = "",
*,
autocommit: bool = False,
+ row_factory: Optional[RowFactory] = None,
**kwargs: Any,
) -> PQGenConn[ConnectionType]:
"""Generator to connect to the database and create a new instance."""
pgconn = yield from connect(conninfo)
conn = cls(pgconn)
conn._autocommit = autocommit
+ conn.row_factory = row_factory
return conn
def _exec_command(self, command: Query) -> PQGen["PGresult"]:
@classmethod
def connect(
- cls, conninfo: str = "", *, autocommit: bool = False, **kwargs: Any
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ row_factory: Optional[RowFactory] = None,
+ **kwargs: Any,
) -> "Connection":
"""
Connect to a database server and return a new `Connection` instance.
TODO: connection_timeout to be implemented.
"""
return cls._wait_conn(
- cls._connect_gen(conninfo, autocommit=autocommit, **kwargs)
+ cls._connect_gen(
+ conninfo,
+ autocommit=autocommit,
+ row_factory=row_factory,
+ **kwargs,
+ )
)
def __enter__(self) -> "Connection":
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = None,
+ row_factory: Optional[RowFactory] = _null_row_factory,
) -> Union[Cursor, ServerCursor]:
"""
Return a new cursor to send commands and queries to the connection.
"""
format = Format.BINARY if binary else Format.TEXT
+ if row_factory is _null_row_factory:
+ row_factory = self.row_factory
if name:
return ServerCursor(
self, name=name, format=format, row_factory=row_factory
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- row_factory: Optional[RowFactory] = None,
+ row_factory: Optional[RowFactory] = _null_row_factory,
) -> Cursor:
"""Execute a query and return a cursor to read its results."""
+ if row_factory is _null_row_factory:
+ row_factory = self.row_factory
cur = self.cursor(row_factory=row_factory)
return cur.execute(query, params, prepare=prepare)
@classmethod
async def connect(
- cls, conninfo: str = "", *, autocommit: bool = False, **kwargs: Any
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ row_factory: Optional[RowFactory] = None,
+ **kwargs: Any,
) -> "AsyncConnection":
return await cls._wait_conn(
- cls._connect_gen(conninfo, autocommit=autocommit, **kwargs)
+ cls._connect_gen(
+ conninfo,
+ autocommit=autocommit,
+ row_factory=row_factory,
+ **kwargs,
+ )
)
async def __aenter__(self) -> "AsyncConnection":
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = None,
+ row_factory: Optional[RowFactory] = _null_row_factory,
) -> Union[AsyncCursor, AsyncServerCursor]:
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
"""
format = Format.BINARY if binary else Format.TEXT
+ if row_factory is _null_row_factory:
+ row_factory = self.row_factory
if name:
return AsyncServerCursor(
self, name=name, format=format, row_factory=row_factory
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- row_factory: Optional[RowFactory] = None,
+ row_factory: Optional[RowFactory] = _null_row_factory,
) -> AsyncCursor:
+ if row_factory is _null_row_factory:
+ row_factory = self.row_factory
cur = self.cursor(row_factory=row_factory)
return await cur.execute(query, params, prepare=prepare)
from psycopg3 import Connection, Notify
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
+from .test_cursor import my_row_factory
def test_connect(dsn):
assert cur.fetchone() == {1, 2}
+def test_row_factory(dsn):
+ conn = Connection.connect(dsn, row_factory=my_row_factory)
+ assert conn.row_factory
+
+ cur = conn.execute("select 'a' as ve")
+ assert cur.fetchone() == ["Ave"]
+
+ cur = conn.execute("select 'a' as ve", row_factory=None)
+ assert cur.fetchone() == ("a",)
+
+ with conn.cursor(row_factory=lambda c: set) as cur:
+ cur.execute("select 1, 1, 2")
+ assert cur.fetchall() == [{1, 2}]
+
+ with conn.cursor(row_factory=None) as cur:
+ cur.execute("select 1, 1, 2")
+ assert cur.fetchall() == [(1, 1, 2)]
+
+ conn.row_factory = None
+ cur = conn.execute("select 'vale'")
+ assert cur.fetchone() == ("vale",)
+
+
def test_str(conn):
assert "[IDLE]" in str(conn)
conn.close()
from psycopg3 import AsyncConnection, Notify
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
+from .test_cursor import my_row_factory
pytestmark = pytest.mark.asyncio
assert await cur.fetchone() == {1, 2}
+async def test_row_factory(dsn):
+ conn = await AsyncConnection.connect(dsn, row_factory=my_row_factory)
+ assert conn.row_factory
+
+ cur = await conn.execute("select 'a' as ve")
+ assert await cur.fetchone() == ["Ave"]
+
+ cur = await conn.execute("select 'a' as ve", row_factory=None)
+ assert await cur.fetchone() == ("a",)
+
+ async with conn.cursor(row_factory=lambda c: set) as cur:
+ await cur.execute("select 1, 1, 2")
+ assert await cur.fetchall() == [{1, 2}]
+
+ async with conn.cursor(row_factory=None) as cur:
+ await cur.execute("select 1, 1, 2")
+ assert await cur.fetchall() == [(1, 1, 2)]
+
+ conn.row_factory = None
+ cur = await conn.execute("select 'vale'")
+ assert await cur.fetchone() == ("vale",)
+
+
async def test_str(aconn):
assert "[IDLE]" in str(aconn)
await aconn.close()