--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 6770
+
+ Fixed issue where type-specific bound parameter handlers would not be
+ called upon in the case of using the :meth:`_sql.Insert.values` method with
+ the Python ``None`` value; in particular, this would be noticed when using
+ the :class:`_types.JSON` datatype as well as related PostgreSQL specific
+ types such as :class:`_postgresql.JSONB` which would fail to encode the
+ Python ``None`` value into JSON null, however the issue was generalized to
+ any bound parameter handler in conjunction with this specific method of
+ :class:`_sql.Insert`.
+
def _literal_coercion(
self, element, name=None, type_=None, argname=None, is_crud=False, **kw
):
- if element is None:
+ if (
+ element is None
+ and not is_crud
+ and (type_ is None or not type_.should_evaluate_none)
+ ):
+ # TODO: there's no test coverage now for the
+ # "should_evaluate_none" part of this, as outside of "crud" this
+ # codepath is not normally used except in some special cases
return elements.Null()
else:
try:
:paramref:`_schema.Column.server_default`; a value of ``None``
passed for these parameters means "no default present".
+ Additionally, when used in SQL comparison expressions, the
+ Python value ``None`` continues to refer to SQL null, and not
+ JSON NULL. The :paramref:`_types.JSON.none_as_null` flag refers
+ explicitly to the **persistence** of the value within an
+ INSERT or UPDATE statement. The :attr:`_types.JSON.NULL`
+ value should be used for SQL expressions that wish to compare to
+ JSON null.
+
.. seealso::
:attr:`.types.JSON.NULL`
metadata,
Column("id", Integer, primary_key=True),
Column("name", String(30), nullable=False),
- Column("data", cls.datatype),
+ Column("data", cls.datatype, nullable=False),
Column("nulldata", cls.datatype(none_as_null=True)),
)
eq_(js.mock_calls, [mock.call(data_element)])
eq_(jd.mock_calls, [mock.call(json.dumps(data_element))])
- def test_round_trip_none_as_sql_null(self, connection):
+ @testing.combinations(
+ ("parameters",),
+ ("multiparameters",),
+ ("values",),
+ ("omit",),
+ argnames="insert_type",
+ )
+ def test_round_trip_none_as_sql_null(self, connection, insert_type):
col = self.tables.data_table.c["nulldata"]
conn = connection
- conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": None}
- )
+
+ if insert_type == "parameters":
+ stmt, params = self.tables.data_table.insert(), {
+ "name": "r1",
+ "nulldata": None,
+ "data": None,
+ }
+ elif insert_type == "multiparameters":
+ stmt, params = self.tables.data_table.insert(), [
+ {"name": "r1", "nulldata": None, "data": None}
+ ]
+ elif insert_type == "values":
+ stmt, params = (
+ self.tables.data_table.insert().values(
+ name="r1",
+ nulldata=None,
+ data=None,
+ ),
+ {},
+ )
+ elif insert_type == "omit":
+ stmt, params = (
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": None},
+ )
+
+ else:
+ assert False
+
+ conn.execute(stmt, params)
eq_(
conn.scalar(
eq_(conn.scalar(select(col)), None)
- def test_round_trip_none_as_json_null(self):
+ @testing.combinations(
+ ("parameters",),
+ ("multiparameters",),
+ ("values",),
+ argnames="insert_type",
+ )
+ def test_round_trip_none_as_json_null(self, connection, insert_type):
col = self.tables.data_table.c["data"]
- with config.db.begin() as conn:
- conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": None}
+ if insert_type == "parameters":
+ stmt, params = self.tables.data_table.insert(), {
+ "name": "r1",
+ "data": None,
+ }
+ elif insert_type == "multiparameters":
+ stmt, params = self.tables.data_table.insert(), [
+ {"name": "r1", "data": None}
+ ]
+ elif insert_type == "values":
+ stmt, params = (
+ self.tables.data_table.insert().values(name="r1", data=None),
+ {},
)
+ else:
+ assert False
- eq_(
- conn.scalar(
- select(self.tables.data_table.c.name).where(
- cast(col, String) == "null"
- )
- ),
- "r1",
- )
+ conn = connection
+ conn.execute(stmt, params)
+
+ eq_(
+ conn.scalar(
+ select(self.tables.data_table.c.name).where(
+ cast(col, String) == "null"
+ )
+ ),
+ "r1",
+ )
- eq_(conn.scalar(select(col)), None)
+ eq_(conn.scalar(select(col)), None)
def test_unicode_round_trip(self):
# note we include Unicode supplementary characters as well
def bind_processor(self, dialect):
def process(value):
+ if value is None:
+ value = "<null value>"
return "BIND_IN" + value
return process
) or (lambda value: value)
def process(value):
+ if value is None:
+ value = "<null value>"
return "BIND_IN" + impl_processor(value)
return process
cache_ok = True
def process_bind_param(self, value, dialect):
+ if value is None:
+ value = u"<null value>"
return "BIND_IN" + value
def process_result_value(self, value, dialect):
cache_ok = True
def process_bind_param(self, value, dialect):
+ if value is None:
+ value = 29
return value * 10
def process_result_value(self, value, dialect):
) or (lambda value: value)
def process(value):
+ if value is None:
+ value = u"<null value>"
+
return "BIND_IN" + impl_processor(value)
return process
goofy10=9,
),
)
+ connection.execute(
+ users.insert(),
+ dict(
+ user_id=5,
+ goofy=None,
+ goofy2=None,
+ goofy4=None,
+ goofy7=None,
+ goofy8=None,
+ goofy9=None,
+ goofy10=None,
+ ),
+ )
def test_processing(self, connection):
users = self.tables.users
result = connection.execute(
users.select().order_by(users.c.user_id)
).fetchall()
- for assertstr, assertint, assertint2, row in zip(
+ eq_(
+ result,
[
- "BIND_INjackBIND_OUT",
- "BIND_INlalaBIND_OUT",
- "BIND_INfredBIND_OUT",
+ (
+ 2,
+ "BIND_INjackBIND_OUT",
+ "BIND_INjackBIND_OUT",
+ "BIND_INjackBIND_OUT",
+ "BIND_INjackBIND_OUT",
+ 1200,
+ 1800,
+ 1200,
+ ),
+ (
+ 3,
+ "BIND_INlalaBIND_OUT",
+ "BIND_INlalaBIND_OUT",
+ "BIND_INlalaBIND_OUT",
+ "BIND_INlalaBIND_OUT",
+ 1500,
+ 2250,
+ 1500,
+ ),
+ (
+ 4,
+ "BIND_INfredBIND_OUT",
+ "BIND_INfredBIND_OUT",
+ "BIND_INfredBIND_OUT",
+ "BIND_INfredBIND_OUT",
+ 900,
+ 1350,
+ 900,
+ ),
+ (
+ 5,
+ "BIND_IN<null value>BIND_OUT",
+ "BIND_IN<null value>BIND_OUT",
+ "BIND_IN<null value>BIND_OUT",
+ "BIND_IN<null value>BIND_OUT",
+ 2900,
+ 4350,
+ 2900,
+ ),
],
- [1200, 1500, 900],
- [1800, 2250, 1350],
- result,
- ):
- for col in list(row)[1:5]:
- eq_(col, assertstr)
- eq_(row[5], assertint)
- eq_(row[6], assertint2)
- for col in row[3], row[4]:
- assert isinstance(col, util.text_type)
+ )
def test_plain_in_typedec(self, connection):
users = self.tables.users
eq_(result.fetchall(), [(3, 1500), (4, 900)])
+class BindProcessorInsertValuesTest(UserDefinedRoundTripTest):
+ """related to #6770, test that insert().values() applies to
+ bound parameter handlers including the None value."""
+
+ __backend__ = True
+
+ def _data_fixture(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert().values(
+ user_id=2,
+ goofy="jack",
+ goofy2="jack",
+ goofy4=util.u("jack"),
+ goofy7=util.u("jack"),
+ goofy8=12,
+ goofy9=12,
+ goofy10=12,
+ ),
+ )
+ connection.execute(
+ users.insert().values(
+ user_id=3,
+ goofy="lala",
+ goofy2="lala",
+ goofy4=util.u("lala"),
+ goofy7=util.u("lala"),
+ goofy8=15,
+ goofy9=15,
+ goofy10=15,
+ ),
+ )
+ connection.execute(
+ users.insert().values(
+ user_id=4,
+ goofy="fred",
+ goofy2="fred",
+ goofy4=util.u("fred"),
+ goofy7=util.u("fred"),
+ goofy8=9,
+ goofy9=9,
+ goofy10=9,
+ ),
+ )
+ connection.execute(
+ users.insert().values(
+ user_id=5,
+ goofy=None,
+ goofy2=None,
+ goofy4=None,
+ goofy7=None,
+ goofy8=None,
+ goofy9=None,
+ goofy10=None,
+ ),
+ )
+
+
class UserDefinedTest(
_UserDefinedTypeFixture, fixtures.TablesTest, AssertsCompiledSQL
):