from .adapt import AdaptersMap
from ._enums import IsolationLevel
from .cursor import Cursor
-from ._cmodule import _psycopg
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
-from .generators import notifies
+from .generators import connect, execute, notifies
from ._encodings import pgconn_encoding
from ._preparing import PrepareManager
from .transaction import Transaction
logger = logging.getLogger("psycopg")
-connect: Callable[[str], PQGenConn["PGconn"]]
-execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
-
# Row Type variable for Cursor (when it needs to be distinguished from the
# connection's one)
CursorRow = TypeVar("CursorRow")
-if _psycopg:
- connect = _psycopg.connect
- execute = _psycopg.execute
-
-else:
- from . import generators
-
- connect = generators.connect
- execute = generators.execute
-
class Notify(NamedTuple):
"""An asynchronous notification received from the database."""
from .copy import Copy
from .rows import Row, RowMaker, RowFactory
from ._column import Column
-from ._cmodule import _psycopg
from ._queries import PostgresQuery
from ._encodings import pgconn_encoding
from ._preparing import Prepare
+from .generators import execute, fetch, send
if TYPE_CHECKING:
from .abc import Transformer
from .pq.abc import PGconn, PGresult
from .connection import Connection
-if _psycopg:
- execute = _psycopg.execute
- fetch = _psycopg.fetch
- send = _psycopg.send
-
-else:
- from . import generators
-
- execute = generators.execute
- fetch = generators.fetch
- send = generators.send
-
_C = TypeVar("_C", bound="Cursor[Any]")
ACTIVE = pq.TransactionStatus.ACTIVE
from .abc import PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
+from ._cmodule import _psycopg
from ._encodings import pgconn_encoding, conninfo_encoding
logger = logging.getLogger(__name__)
-def connect(conninfo: str) -> PQGenConn[PGconn]:
+def _connect(conninfo: str) -> PQGenConn[PGconn]:
"""
Generator to create a database connection without blocking.
return conn
-def execute(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _execute(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator sending a query and returning results without blocking.
Return the list of results returned by the database (whether success
or error).
"""
- yield from send(pgconn)
- rv = yield from fetch_many(pgconn)
+ yield from _send(pgconn)
+ rv = yield from _fetch_many(pgconn)
return rv
-def send(pgconn: PGconn) -> PQGen[None]:
+def _send(pgconn: PGconn) -> PQGen[None]:
"""
Generator to send a query to the server without blocking.
pgconn.consume_input()
-def fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator retrieving results from the database without blocking.
"""
results: List[PGresult] = []
while 1:
- res = yield from fetch(pgconn)
+ res = yield from _fetch(pgconn)
if not res:
break
return results
-def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
+def _fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
"""
Generator retrieving a single result from the database without blocking.
return data
# Retrieve the final result of copy
- results = yield from fetch_many(pgconn)
+ results = yield from _fetch_many(pgconn)
if len(results) > 1:
# TODO: too brutal? Copy worked.
raise e.ProgrammingError("you cannot mix COPY with other operations")
break
# Retrieve the final result of copy
- (result,) = yield from fetch_many(pgconn)
+ (result,) = yield from _fetch_many(pgconn)
if result.status != ExecStatus.COMMAND_OK:
encoding = pgconn_encoding(pgconn)
raise e.error_from_result(result, encoding=encoding)
return result
+
+
+# Override functions with fast versions if available
+if _psycopg:
+ connect = _psycopg.connect
+ execute = _psycopg.execute
+ send = _psycopg.send
+ fetch_many = _psycopg.fetch_many
+ fetch = _psycopg.fetch
+
+else:
+ connect = _connect
+ execute = _execute
+ send = _send
+ fetch_many = _fetch_many
+ fetch = _fetch
from . import errors as e
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
-from .cursor import BaseCursor, Cursor, execute
+from .cursor import BaseCursor, Cursor
+from .generators import execute
from .cursor_async import AsyncCursor
if TYPE_CHECKING:
# Details of the params manipulation are in test_conninfo.
import psycopg.connection
- orig_connect = psycopg.connection.connect
+ orig_connect = psycopg.connection.connect # type: ignore
got_conninfo = None