Render bind cast for ``JSON`` and ``JSONB`` datatype on every dialect.
Previously this was only enabled in a subset of dialects.
Fixes: #11994
Change-Id: Ib085deb3e84034dac9e4f4057d32f055d5533e52
--- /dev/null
+.. change::
+ :tags: postgresql, usecase
+ :tickets: 11994
+
+ Render bind cast for ``JSON`` and ``JSONB`` datatype on every dialect.
+ Previously this was only enabled in a subset of dialects.
class AsyncpgJSON(json.JSON):
- render_bind_cast = True
-
def result_processor(self, dialect, coltype):
return None
class AsyncpgJSONB(json.JSONB):
- render_bind_cast = True
-
def result_processor(self, dialect, coltype):
return None
""" # noqa
+ render_bind_cast = True
astext_type = sqltypes.Text()
def __init__(self, none_as_null=False, astext_type=None):
class _PGJSON(JSON):
- render_bind_cast = True
-
def bind_processor(self, dialect):
return self._make_bind_processor(None, dialect._psycopg_Json)
class _PGJSONB(JSONB):
- render_bind_cast = True
-
def bind_processor(self, dialect):
return self._make_bind_processor(None, dialect._psycopg_Jsonb)
from sqlalchemy import Time
from sqlalchemy import true
from sqlalchemy import tuple_
+from sqlalchemy import Uuid
+from sqlalchemy import values
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import REGCONFIG
stmt = select(fn.c.CaseSensitive, fn.c["the % value"])
eq_(connection.execute(stmt).all(), [(1, "foo"), (2, "bar")])
+
+
+class RequiresCastTest(fixtures.TablesTest):
+ __only_on__ = "postgresql"
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("uuid", Uuid),
+ Column("j", JSON),
+ Column("jb", JSONB),
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ connection.execute(
+ cls.tables["t"].insert(),
+ [
+ {"id": 1, "uuid": "d24587a1-06d9-41df-b1c3-3f423b97a755"},
+ {"id": 2, "uuid": "4b07e1c8-d60c-4ea8-9d01-d7cd01362224"},
+ ],
+ )
+
+ def test_update_values(self, connection):
+ value = values(
+ Column("id", Integer),
+ Column("uuid", Uuid),
+ Column("j", JSON),
+ Column("jb", JSONB),
+ name="update_data",
+ ).data(
+ [
+ (
+ 1,
+ "8b6ec1ec-b979-4d0b-b2ce-9acc6e4c2943",
+ {"foo": 1},
+ {"foo_jb": 1},
+ ),
+ (
+ 2,
+ "a2123bcb-7ea3-420a-8284-1db4b2759d79",
+ {"bar": 2},
+ {"bar_jb": 2},
+ ),
+ ]
+ )
+ connection.execute(
+ self.tables["t"]
+ .update()
+ .values(uuid=value.c.uuid, j=value.c.j, jb=value.c.jb)
+ .where(self.tables["t"].c.id == value.c.id)
+ )
lambda self: self.jsoncol.has_all(
{"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}}
),
- "test_table.test_column ?& %(test_column_1)s",
+ "test_table.test_column ?& %(test_column_1)s::JSONB",
),
(
lambda self: self.jsoncol.has_all(self.any_),
),
(
lambda self: self.jsoncol.contains({"k1": "r1v1"}),
- "test_table.test_column @> %(test_column_1)s",
+ "test_table.test_column @> %(test_column_1)s::JSONB",
),
(
lambda self: self.jsoncol.contains(self.any_),
),
(
lambda self: self.jsoncol.contained_by({"foo": "1", "bar": None}),
- "test_table.test_column <@ %(test_column_1)s",
+ "test_table.test_column <@ %(test_column_1)s::JSONB",
),
(
lambda self: self.jsoncol.contained_by(self.any_),