--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 13059
+
+ Fixed issue where PostgreSQL JSONB operators
+ :meth:`_postgresql.JSONB.Comparator.path_match` and
+ :meth:`_postgresql.JSONB.Comparator.path_exists` were applying incorrect
+ ``VARCHAR`` casts to the right-hand side operand when used with newer
+ PostgreSQL drivers such as psycopg. The operators now indicate the
+ right-hand type as ``JSONPATH``, which currently results in no casting
+ taking place, but is also compatible with explicit casts if the
+ implementation were require it at a later point.
+
+
if TYPE_CHECKING:
from ...engine.interfaces import Dialect
from ...sql.elements import ColumnElement
+ from ...sql.operators import OperatorType
from ...sql.type_api import _BindProcessorType
from ...sql.type_api import _LiteralProcessorType
from ...sql.type_api import TypeEngine
operator_classes = OperatorClass.JSON | OperatorClass.CONCATENABLE
+ def coerce_compared_value(
+ self, op: Optional[OperatorType], value: Any
+ ) -> TypeEngine[Any]:
+ if op in (PATH_MATCH, PATH_EXISTS):
+ return JSON.JSONPathType()
+ else:
+ return super().coerce_compared_value(op, value)
+
class Comparator(JSON.Comparator[_T]):
"""Define comparison operations for :class:`_types.JSON`."""
),
(
lambda self: self.jsoncol.path_exists("$.k1"),
- "test_table.test_column @? %(test_column_1)s::VARCHAR",
+ "test_table.test_column @? %(test_column_1)s",
),
(
lambda self: self.jsoncol.path_exists(self.any_),
),
(
lambda self: self.jsoncol.path_match("$.k1[0] > 2"),
- "test_table.test_column @@ %(test_column_1)s::VARCHAR",
+ "test_table.test_column @@ %(test_column_1)s",
),
(
lambda self: self.jsoncol.path_match(self.any_),
res = connection.scalar(q)
eq_(res, {"k1": {"r6v1": {"subr": [1, 3]}}})
+ @testing.only_on("postgresql >= 12")
+ def test_path_exists(self, connection):
+ self._fixture_data(connection)
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_exists("$.k1")
+ )
+ res = connection.scalars(q).all()
+ eq_(set(res), {"r1", "r2", "r3", "r4", "r5", "r6"})
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_exists("$.k3")
+ )
+ res = connection.scalars(q).all()
+ eq_(res, ["r5"])
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_exists("$.k1.r6v1")
+ )
+ res = connection.scalars(q).all()
+ eq_(res, ["r6"])
+
+ @testing.only_on("postgresql >= 12")
+ def test_path_match(self, connection):
+ self._fixture_data(connection)
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_match("$.k3 > 0")
+ )
+ res = connection.scalars(q).all()
+ eq_(res, ["r5"])
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_match("$.k3 == 5")
+ )
+ res = connection.scalars(q).all()
+ eq_(res, ["r5"])
+
+ q = select(self.data_table.c.name).where(
+ self.data_table.c.data.path_match('$.k1 == "r1v1"')
+ )
+ res = connection.scalars(q).all()
+ eq_(res, ["r1"])
+
class JSONBSuiteTest(suite.JSONTest):
__requires__ = ("postgresql_jsonb",)