# Allow concatenated string literals from async_to_sync
psycopg/psycopg/connection.py: E501
psycopg_pool/psycopg_pool/pool.py: E501
+ psycopg/psycopg/connection.py: E501
# Pytest's importorskip() getting in the way
tests/types/test_numpy.py: E402
else:
from typing_extensions import TypeVar
+if sys.version_info >= (3, 14):
+ from string.templatelib import Interpolation, Template
+else:
+
+ class Template:
+ pass
+
+ class Interpolation:
+ pass
+
+
__all__ = [
+ "Interpolation",
"LiteralString",
"Self",
+ "Template",
"TypeVar",
]
from . import errors as e
from . import generators, postgres, pq
-from .abc import PQGen, PQGenConn, Query
+from .abc import PQGen, PQGenConn, QueryNoTemplate
from .sql import SQL, Composable
from ._tpc import Xid
from .rows import Row
return conn
def _exec_command(
- self, command: Query, result_format: pq.Format = TEXT
+ self, command: QueryNoTemplate, result_format: pq.Format = TEXT
) -> PQGen[PGresult | None]:
"""
Generator to send a command and receive the result to the backend.
from .abc import Buffer, Params, Query
from .sql import Composable
from ._enums import PyFormat
+from ._compat import Template
from ._encodings import conn_encoding
if TYPE_CHECKING:
The results of this function can be obtained accessing the object
attributes (`query`, `params`, `types`, `formats`).
"""
- query = self._ensure_bytes(query)
+ query = self._ensure_bytes(query, vars)
if vars is not None:
# Avoid caching queries extremely long or with a huge number of
f" {', '.join(sorted(i for i in order or () if i not in vars))}"
)
- def _ensure_bytes(self, query: Query) -> bytes:
+ def from_template(self, query: Template) -> bytes:
+ raise NotImplementedError
+
+ def _ensure_bytes(self, query: Query, vars: Params | None) -> bytes:
if isinstance(query, str):
- return query.encode(self._tx.encoding)
+ return query.encode(self._encoding)
+ elif isinstance(query, Template):
+ if vars is not None:
+ raise TypeError(
+ "'execute()' with string template query doesn't support parameters"
+ )
+ return self.from_template(query)
elif isinstance(query, Composable):
return query.as_bytes(self._tx)
else:
The results of this function can be obtained accessing the object
attributes (`query`, `params`, `types`, `formats`).
"""
- query = self._ensure_bytes(query)
+ query = self._ensure_bytes(query, vars)
if vars is not None:
if (
class PostgresRawQuery(PostgresQuery):
def convert(self, query: Query, vars: Params | None) -> None:
- query = self._ensure_bytes(query)
- self.query = query
+ self.query = self._ensure_bytes(query, vars)
self._want_formats = self._order = None
self.dump(vars)
from . import errors as e
from . import sql
-from .abc import AdaptContext, Query
+from .abc import AdaptContext, QueryNoTemplate
from .rows import dict_row
from ._compat import TypeVar
from ._typemod import TypeModifier
register_array(self, context)
@classmethod
- def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
return sql.SQL(
"""\
SELECT
from . import pq
from ._enums import PyFormat as PyFormat
-from ._compat import LiteralString, TypeVar
+from ._compat import LiteralString, Template, TypeVar
if TYPE_CHECKING:
from . import sql
# An object implementing the buffer protocol
Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
-Query: TypeAlias = Union[LiteralString, bytes, "sql.SQL", "sql.Composed"]
+QueryNoTemplate: TypeAlias = Union[LiteralString, bytes, "sql.SQL", "sql.Composed"]
+Query: TypeAlias = Union[QueryNoTemplate, Template]
Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
PipelineCommand: TypeAlias = Callable[[], None]
from . import errors as e
from . import pq, waiting
from .abc import RV, AdaptContext, ConnDict, ConnParam, Params, PQGen, Query
+from .abc import QueryNoTemplate
from ._tpc import Xid
from .rows import Row, RowFactory, args_row, tuple_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
from .cursor import Cursor
-from ._compat import Self
+from ._compat import Self, Template
from ._acompat import Lock
from .conninfo import conninfo_attempts, conninfo_to_dict, make_conninfo
from .conninfo import timeout_from_conninfo
return cur
+ @overload
+ def execute(
+ self,
+ query: QueryNoTemplate,
+ params: Params | None = None,
+ *,
+ prepare: bool | None = None,
+ binary: bool = False,
+ ) -> Cursor[Row]: ...
+
+ @overload
+ def execute(
+ self, query: Template, *, prepare: bool | None = None, binary: bool = False
+ ) -> Cursor[Row]: ...
+
def execute(
self,
query: Query,
if binary:
cur.format = BINARY
- return cur.execute(query, params, prepare=prepare)
+ if isinstance(query, Template):
+ if params is not None:
+ raise TypeError(
+ "'execute()' with string template query doesn't support parameters"
+ )
+ return cur.execute(query, prepare=prepare)
+ else:
+ return cur.execute(query, params, prepare=prepare)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
from . import errors as e
from . import pq, waiting
from .abc import RV, AdaptContext, ConnDict, ConnParam, Params, PQGen, Query
+from .abc import QueryNoTemplate
from ._tpc import Xid
from .rows import AsyncRowFactory, Row, args_row, tuple_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
-from ._compat import Self
+from ._compat import Self, Template
from ._acompat import ALock
from .conninfo import conninfo_attempts_async, conninfo_to_dict, make_conninfo
from .conninfo import timeout_from_conninfo
return cur
+ @overload
+ async def execute(
+ self,
+ query: QueryNoTemplate,
+ params: Params | None = None,
+ *,
+ prepare: bool | None = None,
+ binary: bool = False,
+ ) -> AsyncCursor[Row]: ...
+
+ @overload
+ async def execute(
+ self,
+ query: Template,
+ *,
+ prepare: bool | None = None,
+ binary: bool = False,
+ ) -> AsyncCursor[Row]: ...
+
async def execute(
self,
query: Query,
if binary:
cur.format = BINARY
- return await cur.execute(query, params, prepare=prepare)
+ if isinstance(query, Template):
+ if params is not None:
+ raise TypeError(
+ "'execute()' with string template query"
+ " doesn't support parameters"
+ )
+ return await cur.execute(query, prepare=prepare)
+ else:
+ return await cur.execute(query, params, prepare=prepare)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
from . import errors as e
from . import pq
-from .abc import Params, Query
+from .abc import Params, Query, QueryNoTemplate
from .copy import Copy, Writer
from .rows import Row, RowFactory, RowMaker
-from ._compat import Self
+from ._compat import Self, Template
from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
def _make_row_maker(self) -> RowMaker[Row]:
return self._row_factory(self)
+ @overload
+ def execute(
+ self,
+ query: QueryNoTemplate,
+ params: Params | None = None,
+ *,
+ prepare: bool | None = None,
+ binary: bool | None = None,
+ ) -> Self: ...
+
+ @overload
+ def execute(
+ self,
+ query: Template,
+ *,
+ prepare: bool | None = None,
+ binary: bool | None = None,
+ ) -> Self: ...
+
def execute(
self,
query: Query,
from . import errors as e
from . import pq
-from .abc import Params, Query
+from .abc import Params, Query, QueryNoTemplate
from .copy import AsyncCopy, AsyncWriter
from .rows import AsyncRowFactory, Row, RowMaker
-from ._compat import Self
+from ._compat import Self, Template
from ._cursor_base import BaseCursor
from ._pipeline_async import AsyncPipeline
def _make_row_maker(self) -> RowMaker[Row]:
return self._row_factory(self)
+ @overload
+ async def execute(
+ self,
+ query: QueryNoTemplate,
+ params: Params | None = None,
+ *,
+ prepare: bool | None = None,
+ binary: bool | None = None,
+ ) -> Self: ...
+
+ @overload
+ async def execute(
+ self,
+ query: Template,
+ *,
+ prepare: bool | None = None,
+ binary: bool | None = None,
+ ) -> Self: ...
+
async def execute(
self,
query: Query,
self.python_type: type | None = None
@classmethod
- def _get_info_query(cls, conn: BaseConnection[Any]) -> abc.Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> abc.QueryNoTemplate:
return sql.SQL(
"""\
SELECT
from .. import errors as e
from .. import postgres, sql
from ..pq import Format
-from ..abc import AdaptContext, Query
+from ..abc import AdaptContext, QueryNoTemplate
from ..adapt import Buffer, Dumper, Loader
from .._compat import TypeVar
from .._typeinfo import TypeInfo
self.enum: type[Enum] | None = None
@classmethod
- def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
return sql.SQL(
"""\
SELECT name, oid, array_oid, array_agg(label) AS labels
from .. import errors as e
from .. import postgres, sql
from ..pq import Format
-from ..abc import AdaptContext, Buffer, Dumper, DumperKey, Query
+from ..abc import AdaptContext, Buffer, Dumper, DumperKey, QueryNoTemplate
from .range import Range, T, dump_range_binary, dump_range_text, fail_dump
from .range import load_range_binary, load_range_text
from .._oids import INVALID_OID, TEXT_OID
self.subtype_oid = subtype_oid
@classmethod
- def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
if conn.info.server_version < 140000:
raise e.NotSupportedError(
"multirange types are only available from PostgreSQL 14"
from .. import errors as e
from .. import postgres, sql
from ..pq import Format
-from ..abc import AdaptContext, Buffer, Dumper, DumperKey, DumpFunc, LoadFunc, Query
+from ..abc import AdaptContext, Buffer, Dumper, DumperKey, DumpFunc, LoadFunc
+from ..abc import QueryNoTemplate
from .._oids import INVALID_OID, TEXT_OID
from ..adapt import PyFormat, RecursiveDumper, RecursiveLoader
from .._compat import TypeVar
self.subtype_oid = subtype_oid
@classmethod
- def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
return sql.SQL(
"""\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
-def test_tstring():
- t""
+import pytest
+
+
+async def test_connection_no_params(aconn):
+ with pytest.raises(TypeError):
+ await aconn.execute(t"select 1", [])
+
+
+async def test_cursor_no_params(aconn):
+ cur = aconn.cursor()
+ with pytest.raises(TypeError):
+ await cur.execute(t"select 1", [])