self._want_formats,
self._order,
self._parts,
- ) = _query2pg(bquery, self._encoding)
+ ) = self.query2pg(bquery, self._encoding)
else:
self.query = bquery
self._want_formats = self._order = None
This method updates `params` and `types`.
"""
if vars is not None:
- params = _validate_and_reorder_params(self._parts, vars, self._order)
- assert self._want_formats is not None
+ params = self.validate_and_reorder_params(self._parts, vars, self._order)
+ num_params = len(params)
+ if self._want_formats is None:
+ self._want_formats = [PyFormat.AUTO] * num_params
+ assert len(self._want_formats) == num_params
self.params = self._tx.dump_sequence(params, self._want_formats)
self.types = self._tx.types or ()
self.formats = self._tx.formats
self.types = ()
self.formats = None
+ @staticmethod
+ def is_params_sequence(vars: Params) -> bool:
+ # Try concrete types, then abstract types
+ t = type(vars)
+ if t is list or t is tuple:
+ sequence = True
+ elif t is dict:
+ sequence = False
+ elif isinstance(vars, Sequence) and not isinstance(vars, (bytes, str)):
+ sequence = True
+ elif isinstance(vars, Mapping):
+ sequence = False
+ else:
+ raise TypeError(
+ "query parameters should be a sequence or a mapping,"
+ f" got {type(vars).__name__}"
+ )
+ return sequence
+
+ @staticmethod
+ def validate_and_reorder_params(
+ parts: List[QueryPart], vars: Params, order: Optional[List[str]]
+ ) -> Sequence[Any]:
+ """
+ Verify the compatibility between a query and a set of params.
+ """
+ sequence = PostgresQuery.is_params_sequence(vars)
+
+ if sequence:
+ if len(vars) != len(parts) - 1:
+ raise e.ProgrammingError(
+ f"the query has {len(parts) - 1} placeholders but"
+ f" {len(vars)} parameters were passed"
+ )
+ if vars and not isinstance(parts[0].item, int):
+ raise TypeError("named placeholders require a mapping of parameters")
+ return vars # type: ignore[return-value]
+
+ else:
+ if vars and len(parts) > 1 and not isinstance(parts[0][1], str):
+ raise TypeError(
+ "positional placeholders (%s) require a sequence of parameters"
+ )
+ try:
+ return [
+ vars[item] for item in order or () # type: ignore[call-overload]
+ ]
+ except KeyError:
+ raise e.ProgrammingError(
+ "query parameter missing:"
+ f" {', '.join(sorted(i for i in order or () if i not in vars))}"
+ )
+
+ @staticmethod
+ @lru_cache()
+ def query2pg(
+ query: bytes, encoding: str
+ ) -> Tuple[bytes, Optional[List[PyFormat]], Optional[List[str]], List[QueryPart]]:
+ """
+ Convert Python query and params into something Postgres understands.
+
+ - Convert Python placeholders (``%s``, ``%(name)s``) into Postgres
+ format (``$1``, ``$2``)
+ - placeholders can be %s, %t, or %b (auto, text or binary)
+ - return ``query`` (bytes), ``formats`` (list of formats) ``order``
+ (sequence of names used in the query, in the position they appear)
+ ``parts`` (splits of queries and placeholders).
+ """
+ parts = _split_query(query, encoding)
+ order: Optional[List[str]] = None
+ chunks: List[bytes] = []
+ formats = []
+
+ if isinstance(parts[0].item, int):
+ for part in parts[:-1]:
+ assert isinstance(part.item, int)
+ chunks.append(part.pre)
+ chunks.append(b"$%d" % (part.item + 1))
+ formats.append(part.format)
+
+ elif isinstance(parts[0].item, str):
+ seen: Dict[str, Tuple[bytes, PyFormat]] = {}
+ order = []
+ for part in parts[:-1]:
+ assert isinstance(part.item, str)
+ chunks.append(part.pre)
+ if part.item not in seen:
+ ph = b"$%d" % (len(seen) + 1)
+ seen[part.item] = (ph, part.format)
+ order.append(part.item)
+ chunks.append(ph)
+ formats.append(part.format)
+ else:
+ if seen[part.item][1] != part.format:
+ raise e.ProgrammingError(
+ f"placeholder '{part.item}' cannot have different formats"
+ )
+ chunks.append(seen[part.item][0])
+
+ # last part
+ chunks.append(parts[-1].pre)
+
+ return b"".join(chunks), formats, order, parts
+
class PostgresClientQuery(PostgresQuery):
"""
bquery = query
if vars is not None:
- (self.template, self._order, self._parts) = _query2pg_client(
+ (self.template, _, self._order, self._parts) = PostgresClientQuery.query2pg(
bquery, self._encoding
)
else:
This method updates `params` and `types`.
"""
if vars is not None:
- params = _validate_and_reorder_params(self._parts, vars, self._order)
+ params = self.validate_and_reorder_params(self._parts, vars, self._order)
self.params = tuple(
self._tx.as_literal(p) if p is not None else b"NULL" for p in params
)
else:
self.params = None
-
-@lru_cache()
-def _query2pg(
- query: bytes, encoding: str
-) -> Tuple[bytes, List[PyFormat], Optional[List[str]], List[QueryPart]]:
- """
- Convert Python query and params into something Postgres understands.
-
- - Convert Python placeholders (``%s``, ``%(name)s``) into Postgres
- format (``$1``, ``$2``)
- - placeholders can be %s, %t, or %b (auto, text or binary)
- - return ``query`` (bytes), ``formats`` (list of formats) ``order``
- (sequence of names used in the query, in the position they appear)
- ``parts`` (splits of queries and placeholders).
- """
- parts = _split_query(query, encoding)
- order: Optional[List[str]] = None
- chunks: List[bytes] = []
- formats = []
-
- if isinstance(parts[0].item, int):
- for part in parts[:-1]:
- assert isinstance(part.item, int)
- chunks.append(part.pre)
- chunks.append(b"$%d" % (part.item + 1))
- formats.append(part.format)
-
- elif isinstance(parts[0].item, str):
- seen: Dict[str, Tuple[bytes, PyFormat]] = {}
- order = []
- for part in parts[:-1]:
- assert isinstance(part.item, str)
- chunks.append(part.pre)
- if part.item not in seen:
- ph = b"$%d" % (len(seen) + 1)
- seen[part.item] = (ph, part.format)
- order.append(part.item)
- chunks.append(ph)
- formats.append(part.format)
- else:
- if seen[part.item][1] != part.format:
- raise e.ProgrammingError(
- f"placeholder '{part.item}' cannot have different formats"
- )
- chunks.append(seen[part.item][0])
-
- # last part
- chunks.append(parts[-1].pre)
-
- return b"".join(chunks), formats, order, parts
-
-
-@lru_cache()
-def _query2pg_client(
- query: bytes, encoding: str
-) -> Tuple[bytes, Optional[List[str]], List[QueryPart]]:
- """
- Convert Python query and params into a template to perform client-side binding
- """
- parts = _split_query(query, encoding, collapse_double_percent=False)
- order: Optional[List[str]] = None
- chunks: List[bytes] = []
-
- if isinstance(parts[0].item, int):
- for part in parts[:-1]:
- assert isinstance(part.item, int)
- chunks.append(part.pre)
- chunks.append(b"%s")
-
- elif isinstance(parts[0].item, str):
- seen: Dict[str, Tuple[bytes, PyFormat]] = {}
- order = []
- for part in parts[:-1]:
- assert isinstance(part.item, str)
- chunks.append(part.pre)
- if part.item not in seen:
- ph = b"%s"
- seen[part.item] = (ph, part.format)
- order.append(part.item)
- chunks.append(ph)
- else:
- chunks.append(seen[part.item][0])
- order.append(part.item)
-
- # last part
- chunks.append(parts[-1].pre)
-
- return b"".join(chunks), order, parts
-
-
-def _validate_and_reorder_params(
- parts: List[QueryPart], vars: Params, order: Optional[List[str]]
-) -> Sequence[Any]:
- """
- Verify the compatibility between a query and a set of params.
- """
- # Try concrete types, then abstract types
- t = type(vars)
- if t is list or t is tuple:
- sequence = True
- elif t is dict:
- sequence = False
- elif isinstance(vars, Sequence) and not isinstance(vars, (bytes, str)):
- sequence = True
- elif isinstance(vars, Mapping):
- sequence = False
- else:
- raise TypeError(
- "query parameters should be a sequence or a mapping,"
- f" got {type(vars).__name__}"
- )
-
- if sequence:
- if len(vars) != len(parts) - 1:
- raise e.ProgrammingError(
- f"the query has {len(parts) - 1} placeholders but"
- f" {len(vars)} parameters were passed"
- )
- if vars and not isinstance(parts[0].item, int):
- raise TypeError("named placeholders require a mapping of parameters")
- return vars # type: ignore[return-value]
-
- else:
- if vars and len(parts) > 1 and not isinstance(parts[0][1], str):
- raise TypeError(
- "positional placeholders (%s) require a sequence of parameters"
- )
- try:
- return [vars[item] for item in order or ()] # type: ignore[call-overload]
- except KeyError:
- raise e.ProgrammingError(
- "query parameter missing:"
- f" {', '.join(sorted(i for i in order or () if i not in vars))}"
- )
+ @staticmethod
+ @lru_cache()
+ def query2pg(
+ query: bytes, encoding: str
+ ) -> Tuple[bytes, Optional[List[PyFormat]], Optional[List[str]], List[QueryPart]]:
+ """
+ Convert Python query and params into a template to perform client-side binding
+ """
+ parts = _split_query(query, encoding, collapse_double_percent=False)
+ order: Optional[List[str]] = None
+ chunks: List[bytes] = []
+
+ if isinstance(parts[0].item, int):
+ for part in parts[:-1]:
+ assert isinstance(part.item, int)
+ chunks.append(part.pre)
+ chunks.append(b"%s")
+
+ elif isinstance(parts[0].item, str):
+ seen: Dict[str, Tuple[bytes, PyFormat]] = {}
+ order = []
+ for part in parts[:-1]:
+ assert isinstance(part.item, str)
+ chunks.append(part.pre)
+ if part.item not in seen:
+ ph = b"%s"
+ seen[part.item] = (ph, part.format)
+ order.append(part.item)
+ chunks.append(ph)
+ else:
+ chunks.append(seen[part.item][0])
+ order.append(part.item)
+
+ # last part
+ chunks.append(parts[-1].pre)
+
+ return b"".join(chunks), None, order, parts
_re_placeholder = re.compile(