ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- def __init__(
- self,
- pgconn: "PGconn",
- row_factory: Union[RowFactory[Row], AsyncRowFactory[Row]],
- ):
+ def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn # TODO: document this
- self._row_factory = row_factory
self._autocommit = False
self._adapters = adapt.AdaptersMap(postgres.adapters)
self._notice_handlers: List[NoticeHandler] = []
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory[Any]] = None,
) -> PQGenConn[ConnectionType]:
"""Generator to connect to the database and create a new instance."""
pgconn = yield from connect(conninfo)
- if not row_factory:
- row_factory = tuple_row
- conn = cls(pgconn, row_factory)
+ conn = cls(pgconn)
conn._autocommit = bool(autocommit)
return conn
cursor_factory: Type[Cursor[Row]]
server_cursor_factory: Type[ServerCursor[Row]]
+ row_factory: RowFactory[Row]
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Row]):
- super().__init__(pgconn, row_factory)
+ def __init__(
+ self, pgconn: "PGconn", row_factory: Optional[RowFactory[Row]] = None
+ ):
+ super().__init__(pgconn)
+ self.row_factory = row_factory or cast(RowFactory[Row], tuple_row)
self.lock = threading.Lock()
self.cursor_factory = Cursor
self.server_cursor_factory = ServerCursor
Connect to a database server and return a new `Connection` instance.
"""
conninfo, timeout = _conninfo_connect_timeout(conninfo, **kwargs)
- return cls._wait_conn(
- cls._connect_gen(
- conninfo, autocommit=autocommit, row_factory=row_factory
- ),
+ rv = cls._wait_conn(
+ cls._connect_gen(conninfo, autocommit=autocommit),
timeout,
)
+ if row_factory:
+ rv.row_factory = row_factory
+ return rv
def __enter__(self) -> "Connection[Row]":
return self
self._closed = True
self.pgconn.finish()
- @property
- def row_factory(self) -> RowFactory[Row]:
- """Writable attribute to control how result rows are formed."""
- return cast(RowFactory[Row], self._row_factory)
-
- @row_factory.setter
- def row_factory(self, row_factory: RowFactory[Row]) -> None:
- self._row_factory = row_factory
-
@overload
def cursor(self, *, binary: bool = False) -> Cursor[Row]:
...
cursor_factory: Type[AsyncCursor[Row]]
server_cursor_factory: Type[AsyncServerCursor[Row]]
+ row_factory: AsyncRowFactory[Row]
- def __init__(self, pgconn: "PGconn", row_factory: AsyncRowFactory[Row]):
- super().__init__(pgconn, row_factory)
+ def __init__(
+ self,
+ pgconn: "PGconn",
+ row_factory: Optional[AsyncRowFactory[Row]] = None,
+ ):
+ super().__init__(pgconn)
+ self.row_factory = row_factory or cast(AsyncRowFactory[Row], tuple_row)
self.lock = asyncio.Lock()
self.cursor_factory = AsyncCursor
self.server_cursor_factory = AsyncServerCursor
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory[Row]] = None,
+ row_factory: Optional[AsyncRowFactory[Row]] = None,
**kwargs: Any,
) -> "AsyncConnection[Any]":
conninfo, timeout = _conninfo_connect_timeout(conninfo, **kwargs)
- return await cls._wait_conn(
- cls._connect_gen(
- conninfo, autocommit=autocommit, row_factory=row_factory
- ),
+ rv = await cls._wait_conn(
+ cls._connect_gen(conninfo, autocommit=autocommit),
timeout,
)
+ if row_factory:
+ rv.row_factory = row_factory
+ return rv
async def __aenter__(self) -> "AsyncConnection[Row]":
return self
self._closed = True
self.pgconn.finish()
- @property
- def row_factory(self) -> AsyncRowFactory[Row]:
- """Writable attribute to control how result rows are formed."""
- return cast(AsyncRowFactory[Row], self._row_factory)
-
- @row_factory.setter
- def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
- self._row_factory = row_factory
-
@overload
def cursor(self, *, binary: bool = False) -> AsyncCursor[Row]:
...