from . import errors as e
from .abc import PipelineCommand, PQGen
from ._compat import Deque, TypeAlias
-from ._cmodule import _psycopg
from ._encodings import pgconn_encoding
from ._preparing import Key, Prepare
+from .generators import pipeline_communicate, fetch_many, send
if TYPE_CHECKING:
from .pq.abc import PGresult
from .connection import BaseConnection, Connection
from .connection_async import AsyncConnection
-if _psycopg:
- pipeline_communicate = _psycopg.pipeline_communicate
- fetch_many = _psycopg.fetch_many
- send = _psycopg.send
-
-else:
- from . import generators
-
- pipeline_communicate = generators.pipeline_communicate
- fetch_many = generators.fetch_many
- send = generators.send
PendingResult: TypeAlias = Union[
None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]]
from ._enums import IsolationLevel
from .cursor import Cursor
from ._compat import TypeAlias, LiteralString
-from ._cmodule import _psycopg
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
from ._pipeline import BasePipeline, Pipeline
-from .generators import notifies
+from .generators import notifies, connect, execute
from ._encodings import pgconn_encoding
from ._preparing import PrepareManager
from .transaction import Transaction
from .pq.abc import PGconn, PGresult
from psycopg_pool.base import BasePool
-if _psycopg:
- connect = _psycopg.connect
- execute = _psycopg.execute
-
-else:
- from . import generators
-
- connect = generators.connect
- execute = generators.execute
# Row Type variable for Cursor (when it needs to be distinguished from the
# connection's one)
from .copy import Copy, Writer as CopyWriter
from .rows import Row, RowMaker, RowFactory
from ._column import Column
-from ._cmodule import _psycopg
from ._queries import PostgresQuery, PostgresClientQuery
from ._pipeline import Pipeline
from ._encodings import pgconn_encoding
from ._preparing import Prepare
+from .generators import execute, fetch, send
if TYPE_CHECKING:
from .abc import Transformer
from .pq.abc import PGconn, PGresult
from .connection import Connection
-if _psycopg:
- execute = _psycopg.execute
- fetch = _psycopg.fetch
- send = _psycopg.send
-
-else:
- from . import generators
-
- execute = generators.execute
- fetch = generators.fetch
- send = generators.send
-
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
from ._compat import Deque
+from ._cmodule import _psycopg
from ._encodings import pgconn_encoding, conninfo_encoding
OK = pq.ConnStatus.OK
logger = logging.getLogger(__name__)
-def connect(conninfo: str) -> PQGenConn[PGconn]:
+def _connect(conninfo: str) -> PQGenConn[PGconn]:
"""
Generator to create a database connection without blocking.
return conn
-def execute(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _execute(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator sending a query and returning results without blocking.
Return the list of results returned by the database (whether success
or error).
"""
- yield from send(pgconn)
- rv = yield from fetch_many(pgconn)
+ yield from _send(pgconn)
+ rv = yield from _fetch_many(pgconn)
return rv
-def send(pgconn: PGconn) -> PQGen[None]:
+def _send(pgconn: PGconn) -> PQGen[None]:
"""
Generator to send a query to the server without blocking.
pgconn.consume_input()
-def fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator retrieving results from the database without blocking.
"""
results: List[PGresult] = []
while True:
- res = yield from fetch(pgconn)
+ res = yield from _fetch(pgconn)
if not res:
break
return results
-def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
+def _fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
"""
Generator retrieving a single result from the database without blocking.
return pgconn.get_result()
-def pipeline_communicate(
+def _pipeline_communicate(
pgconn: PGconn, commands: Deque[PipelineCommand]
) -> PQGen[List[List[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
return data
# Retrieve the final result of copy
- results = yield from fetch_many(pgconn)
+ results = yield from _fetch_many(pgconn)
if len(results) > 1:
# TODO: too brutal? Copy worked.
raise e.ProgrammingError("you cannot mix COPY with other operations")
break
# Retrieve the final result of copy
- (result,) = yield from fetch_many(pgconn)
+ (result,) = yield from _fetch_many(pgconn)
if result.status != COMMAND_OK:
encoding = pgconn_encoding(pgconn)
raise e.error_from_result(result, encoding=encoding)
return result
+
+
+# Override functions with fast versions if available
+if _psycopg:
+ connect = _psycopg.connect
+ execute = _psycopg.execute
+ send = _psycopg.send
+ fetch_many = _psycopg.fetch_many
+ fetch = _psycopg.fetch
+ pipeline_communicate = _psycopg.pipeline_communicate
+
+else:
+ connect = _connect
+ execute = _execute
+ send = _send
+ fetch_many = _fetch_many
+ fetch = _fetch
+ pipeline_communicate = _pipeline_communicate
from . import errors as e
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
-from .cursor import BaseCursor, Cursor, execute
+from .cursor import BaseCursor, Cursor
+from .generators import execute
from .cursor_async import AsyncCursor
if TYPE_CHECKING:
# Details of the params manipulation are in test_conninfo.
import psycopg.connection
- orig_connect = psycopg.connection.connect
+ orig_connect = psycopg.connection.connect # type: ignore
got_conninfo = None