# Copyright (C) 2022 The Psycopg Team
-import re
from enum import Enum
-from typing import Any, Optional, Type, Union, overload, TYPE_CHECKING
-from ._typeinfo import TypeInfo, TypesRegistry
-
-from . import errors as e
-from .abc import AdaptContext, NoneType
-from .rows import Row, RowFactory, AsyncRowFactory, TupleRow
-from .postgres import TEXT_OID
-from .conninfo import ConnectionInfo
-from .connection import Connection
-from ._adapters_map import AdaptersMap
-from .connection_async import AsyncConnection
-from .types.enum import EnumDumper, EnumBinaryDumper
-from .types.none import NoneDumper
-
-if TYPE_CHECKING:
- from .pq.abc import PGconn
- from .cursor import Cursor
- from .cursor_async import AsyncCursor
+from .._typeinfo import TypeInfo, TypesRegistry
+
+from ..abc import AdaptContext, NoneType
+from ..postgres import TEXT_OID
+from .._adapters_map import AdaptersMap
+from ..types.enum import EnumDumper, EnumBinaryDumper
+from ..types.none import NoneDumper
types = TypesRegistry()
adapters = AdaptersMap(types=types)
-class _CrdbConnectionMixin:
-
- _adapters: Optional[AdaptersMap]
- pgconn: "PGconn"
-
- @classmethod
- def is_crdb(
- cls, conn: Union[Connection[Any], AsyncConnection[Any], "PGconn"]
- ) -> bool:
- """
- Return True if the server connected to ``conn`` is CockroachDB.
- """
- if isinstance(conn, (Connection, AsyncConnection)):
- conn = conn.pgconn
-
- return bool(conn.parameter_status(b"crdb_version"))
-
- @property
- def adapters(self) -> AdaptersMap:
- if not self._adapters:
- # By default, use CockroachDB adapters map
- self._adapters = AdaptersMap(adapters)
-
- return self._adapters
-
- @property
- def info(self) -> "CrdbConnectionInfo":
- return CrdbConnectionInfo(self.pgconn)
-
-
-class CrdbConnection(_CrdbConnectionMixin, Connection[Row]):
- # TODO: this method shouldn't require re-definition if the base class
- # implements a generic self.
- # https://github.com/psycopg/psycopg/issues/308
- @overload
- @classmethod
- def connect(
- cls,
- conninfo: str = "",
- *,
- autocommit: bool = False,
- row_factory: RowFactory[Row],
- prepare_threshold: Optional[int] = 5,
- cursor_factory: "Optional[Type[Cursor[Row]]]" = None,
- context: Optional[AdaptContext] = None,
- **kwargs: Union[None, int, str],
- ) -> "CrdbConnection[Row]":
- ...
-
- @overload
- @classmethod
- def connect(
- cls,
- conninfo: str = "",
- *,
- autocommit: bool = False,
- prepare_threshold: Optional[int] = 5,
- cursor_factory: "Optional[Type[Cursor[Any]]]" = None,
- context: Optional[AdaptContext] = None,
- **kwargs: Union[None, int, str],
- ) -> "CrdbConnection[TupleRow]":
- ...
-
- @classmethod
- def connect(cls, conninfo: str = "", **kwargs: Any) -> "CrdbConnection[Any]":
- return super().connect(conninfo, **kwargs) # type: ignore[return-value]
-
-
-class AsyncCrdbConnection(_CrdbConnectionMixin, AsyncConnection[Row]):
- # TODO: this method shouldn't require re-definition if the base class
- # implements a generic self.
- # https://github.com/psycopg/psycopg/issues/308
- @overload
- @classmethod
- async def connect(
- cls,
- conninfo: str = "",
- *,
- autocommit: bool = False,
- prepare_threshold: Optional[int] = 5,
- row_factory: AsyncRowFactory[Row],
- cursor_factory: "Optional[Type[AsyncCursor[Row]]]" = None,
- context: Optional[AdaptContext] = None,
- **kwargs: Union[None, int, str],
- ) -> "AsyncCrdbConnection[Row]":
- ...
-
- @overload
- @classmethod
- async def connect(
- cls,
- conninfo: str = "",
- *,
- autocommit: bool = False,
- prepare_threshold: Optional[int] = 5,
- cursor_factory: "Optional[Type[AsyncCursor[Any]]]" = None,
- context: Optional[AdaptContext] = None,
- **kwargs: Union[None, int, str],
- ) -> "AsyncCrdbConnection[TupleRow]":
- ...
-
- @classmethod
- async def connect(
- cls, conninfo: str = "", **kwargs: Any
- ) -> "AsyncCrdbConnection[Any]":
- return await super().connect(conninfo, **kwargs) # type: ignore [no-any-return]
-
-
-connect = CrdbConnection.connect
-
-
-class CrdbConnectionInfo(ConnectionInfo):
- @property
- def vendor(self) -> str:
- return "CockroachDB"
-
- @property
- def crdb_version(self) -> int:
- """
- Return the CockroachDB server version connected.
-
- Return None if the server is not CockroachDB, else return a number in
- the PostgreSQL format (e.g. 21.2.10 -> 200210)
-
- Assume all the connections are on the same db: return a cached result on
- following calls.
- """
- sver = self.parameter_status("crdb_version")
- if not sver:
- raise e.InternalError("'crdb_version' parameter status not set")
-
- ver = self.parse_crdb_version(sver)
- if ver is None:
- raise e.InterfaceError(f"couldn't parse CockroachDB version from: {sver!r}")
-
- return ver
-
- @classmethod
- def parse_crdb_version(self, sver: str) -> Optional[int]:
- m = re.search(r"\bv(\d+)\.(\d+)\.(\d+)", sver)
- if not m:
- return None
-
- return int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3))
-
-
class CrdbEnumDumper(EnumDumper):
oid = TEXT_OID
def register_postgres_adapters(context: AdaptContext) -> None:
# Same adapters used by PostgreSQL, or a good starting point for customization
- from .types import array, bool, composite, datetime
- from .types import numeric, string, uuid
+ from ..types import array, bool, composite, datetime
+ from ..types import numeric, string, uuid
array.register_default_adapters(context)
bool.register_default_adapters(context)
def register_crdb_adapters(context: AdaptContext) -> None:
- from . import dbapi20
- from .types import array
+ from .. import dbapi20
+ from ..types import array
register_postgres_adapters(context)
def register_crdb_string_adapters(context: AdaptContext) -> None:
- from .types import string
+ from ..types import string
# Dump strings with text oid instead of unknown.
# Unlike PostgreSQL, CRDB seems able to cast text to most types.
def register_crdb_json_adapters(context: AdaptContext) -> None:
- from .types import json
+ from ..types import json
adapters = context.adapters
def register_crdb_net_adapters(context: AdaptContext) -> None:
- from psycopg.types import net
+ from ..types import net
adapters = context.adapters
# autogenerated: end
]:
types.add(t)
-
-
-register_crdb_adapters(adapters)
--- /dev/null
+"""
+CockroachDB-specific connections.
+"""
+
+# Copyright (C) 2022 The Psycopg Team
+
+import re
+from typing import Any, Optional, Type, Union, overload, TYPE_CHECKING
+
+from .. import errors as e
+from ..abc import AdaptContext
+from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
+from ..conninfo import ConnectionInfo
+from ..connection import Connection
+from .._adapters_map import AdaptersMap
+from ..connection_async import AsyncConnection
+from ._types import adapters
+
+if TYPE_CHECKING:
+ from ..pq.abc import PGconn
+ from ..cursor import Cursor
+ from ..cursor_async import AsyncCursor
+
+
+class _CrdbConnectionMixin:
+
+ _adapters: Optional[AdaptersMap]
+ pgconn: "PGconn"
+
+ @classmethod
+ def is_crdb(
+ cls, conn: Union[Connection[Any], AsyncConnection[Any], "PGconn"]
+ ) -> bool:
+ """
+ Return True if the server connected to ``conn`` is CockroachDB.
+ """
+ if isinstance(conn, (Connection, AsyncConnection)):
+ conn = conn.pgconn
+
+ return bool(conn.parameter_status(b"crdb_version"))
+
+ @property
+ def adapters(self) -> AdaptersMap:
+ if not self._adapters:
+ # By default, use CockroachDB adapters map
+ self._adapters = AdaptersMap(adapters)
+
+ return self._adapters
+
+ @property
+ def info(self) -> "CrdbConnectionInfo":
+ return CrdbConnectionInfo(self.pgconn)
+
+
+class CrdbConnection(_CrdbConnectionMixin, Connection[Row]):
+ """
+ Wrapper for a connection to a CockroachDB database.
+ """
+
+ __module__ = "psycopg.crdb"
+
+ # TODO: this method shouldn't require re-definition if the base class
+ # implements a generic self.
+ # https://github.com/psycopg/psycopg/issues/308
+ @overload
+ @classmethod
+ def connect(
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ row_factory: RowFactory[Row],
+ prepare_threshold: Optional[int] = 5,
+ cursor_factory: "Optional[Type[Cursor[Row]]]" = None,
+ context: Optional[AdaptContext] = None,
+ **kwargs: Union[None, int, str],
+ ) -> "CrdbConnection[Row]":
+ ...
+
+ @overload
+ @classmethod
+ def connect(
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ prepare_threshold: Optional[int] = 5,
+ cursor_factory: "Optional[Type[Cursor[Any]]]" = None,
+ context: Optional[AdaptContext] = None,
+ **kwargs: Union[None, int, str],
+ ) -> "CrdbConnection[TupleRow]":
+ ...
+
+ @classmethod
+ def connect(cls, conninfo: str = "", **kwargs: Any) -> "CrdbConnection[Any]":
+ """
+ Connect to a database server and return a new `CrdbConnection` instance.
+ """
+ return super().connect(conninfo, **kwargs) # type: ignore[return-value]
+
+
+class AsyncCrdbConnection(_CrdbConnectionMixin, AsyncConnection[Row]):
+ """
+ Wrapper for an async connection to a CockroachDB database.
+ """
+
+ __module__ = "psycopg.crdb"
+
+ # TODO: this method shouldn't require re-definition if the base class
+ # implements a generic self.
+ # https://github.com/psycopg/psycopg/issues/308
+ @overload
+ @classmethod
+ async def connect(
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ prepare_threshold: Optional[int] = 5,
+ row_factory: AsyncRowFactory[Row],
+ cursor_factory: "Optional[Type[AsyncCursor[Row]]]" = None,
+ context: Optional[AdaptContext] = None,
+ **kwargs: Union[None, int, str],
+ ) -> "AsyncCrdbConnection[Row]":
+ ...
+
+ @overload
+ @classmethod
+ async def connect(
+ cls,
+ conninfo: str = "",
+ *,
+ autocommit: bool = False,
+ prepare_threshold: Optional[int] = 5,
+ cursor_factory: "Optional[Type[AsyncCursor[Any]]]" = None,
+ context: Optional[AdaptContext] = None,
+ **kwargs: Union[None, int, str],
+ ) -> "AsyncCrdbConnection[TupleRow]":
+ ...
+
+ @classmethod
+ async def connect(
+ cls, conninfo: str = "", **kwargs: Any
+ ) -> "AsyncCrdbConnection[Any]":
+ return await super().connect(conninfo, **kwargs) # type: ignore [no-any-return]
+
+
+class CrdbConnectionInfo(ConnectionInfo):
+ """
+ `~psycopg.ConnectionInfo` subclass to get info about a CockroachDB database.
+ """
+
+ __module__ = "psycopg.crdb"
+
+ @property
+ def vendor(self) -> str:
+ return "CockroachDB"
+
+ @property
+ def crdb_version(self) -> int:
+ """
+ Return the CockroachDB server version connected.
+
+ Return None if the server is not CockroachDB, else return a number in
+ the PostgreSQL format (e.g. 21.2.10 -> 200210)
+
+ Assume all the connections are on the same db: return a cached result on
+ following calls.
+ """
+ sver = self.parameter_status("crdb_version")
+ if not sver:
+ raise e.InternalError("'crdb_version' parameter status not set")
+
+ ver = self.parse_crdb_version(sver)
+ if ver is None:
+ raise e.InterfaceError(f"couldn't parse CockroachDB version from: {sver!r}")
+
+ return ver
+
+ @classmethod
+ def parse_crdb_version(self, sver: str) -> Optional[int]:
+ m = re.search(r"\bv(\d+)\.(\d+)\.(\d+)", sver)
+ if not m:
+ return None
+
+ return int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3))