--- /dev/null
+.. change::
+ :tags: feature, oracle
+ :tickets: 10375
+
+ Added support for the :class:`_sqltypes.JSON` datatype when using the
+ Oracle database with the oracledb dialect. JSON values are serialized and
+ deserialized using configurable strategies that accommodate Oracle's native
+ JSON type available as of Oracle 21c. Pull request courtesy Abdallah
+ Alhadad.
+
+ .. seealso::
+
+ :class:`_oracle.JSON` - Oracle-specific JSON class that includes
+ implementation and platform notes.
+
+ :ref:`oracledb_json`
.. autoclass:: INTERVAL
:members: __init__
+.. autoclass:: JSON
+ :members: __init__
+
.. autoclass:: NCLOB
:members: __init__
from ...sql.functions import random
from ...sql.functions import rollup
from ...sql.functions import sysdate
+ from ...sql.sqltypes import _JSON_VALUE
from ...sql.type_api import TypeEngine
from ...sql.visitors import ExternallyTraversible
from ...util.typing import TupleAny
def __init__(
self,
- json_serializer: Optional[Callable[..., Any]] = None,
- json_deserializer: Optional[Callable[..., Any]] = None,
+ json_serializer: Callable[[_JSON_VALUE], str] | None = None,
+ json_deserializer: Callable[[str], _JSON_VALUE] | None = None,
is_mariadb: Optional[bool] = None,
**kwargs: Any,
) -> None:
from .base import VECTOR
from .base import VectorIndexConfig
from .base import VectorIndexType
+from .json import JSON
from .vector import SparseVector
from .vector import VectorDistanceType
from .vector import VectorStorageFormat
"VectorStorageFormat",
"VectorStorageType",
"SparseVector",
+ "JSON",
)
from functools import lru_cache
from functools import wraps
import re
+from typing import Any
+from typing import Callable
+from typing import TYPE_CHECKING
from . import dictionary
+from .json import JSON
+from .json import JSONIndexType
+from .json import JSONPathType
from .types import _OracleBoolean
from .types import _OracleDate
from .types import BFILE
from ...sql import and_
from ...sql import bindparam
from ...sql import compiler
+from ...sql import elements
from ...sql import expression
from ...sql import func
from ...sql import null
from ...sql import sqltypes
from ...sql import util as sql_util
from ...sql import visitors
+from ...sql.base import NO_ARG
from ...sql.compiler import AggregateOrderByStyle
from ...sql.visitors import InternalTraversal
from ...types import BLOB
from ...types import REAL
from ...types import VARCHAR
+if TYPE_CHECKING:
+ from ...sql.sqltypes import _JSON_VALUE
+
RESERVED_WORDS = set(
"SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
"DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
sqltypes.Interval: INTERVAL,
sqltypes.DateTime: DATE,
sqltypes.Date: _OracleDate,
+ sqltypes.JSON: JSON,
+ sqltypes.JSON.JSONIndexType: JSONIndexType,
+ sqltypes.JSON.JSONPathType: JSONPathType,
}
ischema_names = {
"ROWID": ROWID,
"BOOLEAN": BOOLEAN,
"VECTOR": VECTOR,
+ "JSON": JSON,
}
)
return f"VECTOR({dim},{storage_format},{storage_type})"
+ def visit_JSON(self, type_: JSON, **kw: Any) -> str:
+ use_blob = (
+ not self.dialect._supports_oracle_json
+ if getattr(type_, "use_blob", NO_ARG) is NO_ARG
+ else type_.use_blob
+ )
+
+ if use_blob:
+ return "BLOB"
+ else:
+ return "JSON"
+
class OracleCompiler(compiler.SQLCompiler):
"""Oracle compiler modifies the lexical structure of Select
def visit_false(self, expr, **kw):
return "0"
+ def visit_cast(self, cast, **kwargs):
+ # Oracle requires VARCHAR2 to have a length in CAST expressions
+ # Adapt String types to VARCHAR2 with appropriate length
+ type_ = cast.typeclause.type
+ if isinstance(type_, sqltypes.String) and not isinstance(
+ type_, (sqltypes.Text, sqltypes.CLOB)
+ ):
+ adapted = VARCHAR2._adapt_string_for_cast(type_)
+ type_clause = self.dialect.type_compiler_instance.process(adapted)
+ else:
+ type_clause = cast.typeclause._compiler_dispatch(self, **kwargs)
+
+ return "CAST(%s AS %s)" % (
+ cast.clause._compiler_dispatch(self, **kwargs),
+ type_clause,
+ )
+
def get_cte_preamble(self, recursive):
return "WITH"
def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
raise exc.CompileError("Cannot compile bitwise_not in oracle")
+ def _render_json_extract_from_binary(self, binary, operator, **kw):
+ literal_kw = kw.copy()
+ literal_kw["literal_binds"] = True
+
+ left = self.process(binary.left, **kw)
+ right = self.process(binary.right, **literal_kw)
+
+ if binary.type._type_affinity is sqltypes.Boolean:
+ # RETURNING clause doesn't handle true/false to 1/0
+ # mapping, so use CASE expression for boolean
+ return (
+ f"CASE JSON_VALUE({left}, {right})"
+ f" WHEN 'true' THEN 1"
+ f" WHEN 'false' THEN 0"
+ f" ELSE CAST(JSON_VALUE({left}, {right})"
+ f" AS NUMBER(1)) END"
+ )
+ elif binary.type._type_affinity is sqltypes.Integer:
+ json_value_returning = "INTEGER"
+ elif binary.type._type_affinity in (
+ sqltypes.Numeric,
+ sqltypes.Float,
+ ):
+ if isinstance(binary.type, sqltypes.Float):
+ json_value_returning = "FLOAT"
+ else:
+ json_value_returning = (
+ f"NUMBER({binary.type.precision}, {binary.type.scale})"
+ )
+ elif binary.type._type_affinity is sqltypes.String:
+ json_value_returning = "VARCHAR2(4000)"
+ else:
+ # binary.type._type_affinity is sqltypes.JSON
+ # or other
+ return f"JSON_QUERY({left}, {right})"
+
+ return (
+ f"JSON_VALUE({left}, {right}"
+ f" RETURNING {json_value_returning} ERROR ON ERROR)"
+ )
+
+ def visit_json_getitem_op_binary(
+ self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
+ ) -> str:
+ return self._render_json_extract_from_binary(binary, operator, **kw)
+
+ def visit_json_path_getitem_op_binary(
+ self, binary: elements.BinaryExpression[Any], operator: Any, **kw: Any
+ ) -> str:
+ return self._render_json_extract_from_binary(binary, operator, **kw)
+
class OracleDDLCompiler(compiler.DDLCompiler):
supports_empty_insert = False
supports_identity_columns = True
+ _supports_oracle_json = True
+
aggregate_order_by_style = AggregateOrderByStyle.WITHIN_GROUP
statement_compiler = OracleCompiler
use_nchar_for_unicode=False,
exclude_tablespaces=("SYSTEM", "SYSAUX"),
enable_offset_fetch=True,
+ json_serializer: Callable[[_JSON_VALUE], str] | None = None,
+ json_deserializer: Callable[[str], _JSON_VALUE] | None = None,
**kwargs,
):
default.DefaultDialect.__init__(self, **kwargs)
self.enable_offset_fetch = self._supports_offset_fetch = (
enable_offset_fetch
)
+ self._json_serializer = json_serializer
+ self._json_deserializer = json_deserializer
def initialize(self, connection):
super().initialize(connection)
self.colspecs.pop(sqltypes.Interval)
self.use_ansi = False
+ self._supports_oracle_json = self.server_version_info >= (21,)
self.supports_native_boolean = self.server_version_info >= (23,)
self.supports_identity_columns = self.server_version_info >= (12,)
self._supports_offset_fetch = (
from __future__ import annotations
import decimal
+import json
import random
import re
from .base import OracleCompiler
from .base import OracleDialect
from .base import OracleExecutionContext
+from .json import JSON
from .types import _OracleDateLiteralRender
from ... import exc
from ... import util
from ...engine import processors
from ...sql import sqltypes
from ...sql._typing import is_sql_compiler
+from ...sql.base import NO_ARG
from ...sql.sqltypes import Boolean
# source:
# https://github.com/oracle/python-cx_Oracle/issues/596#issuecomment-999243649
_CX_ORACLE_MAGIC_LOB_SIZE = 131072
+# largest JSON we can deserialize if we are not using
+# DB_TYPE_JSON
+_CX_ORACLE_MAX_JSON_CONVERTED = 32767
+
+
+class _OracleJson(JSON):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.DB_TYPE_JSON
+
+ def _should_use_blob(self, dialect):
+ use_blob = (
+ not dialect._supports_oracle_json
+ if self.use_blob is NO_ARG
+ else self.use_blob
+ )
+
+ return use_blob
+
+ def bind_processor(self, dialect):
+
+ if self._should_use_blob(dialect):
+
+ DBAPIBinary = dialect.dbapi.Binary
+
+ def string_process(value):
+ if value is not None:
+ # utf-8 is standard for oracledb
+ # https://python-oracledb.readthedocs.io/en/latest/user_guide/globalization.html#setting-the-client-character-set # noqa: E501
+ return DBAPIBinary(value.encode("utf-8"))
+ else:
+ return None
+
+ else:
+ string_process = None
+
+ json_serializer = dialect._json_serializer or json.dumps
+
+ return self._make_bind_processor(string_process, json_serializer)
+
+ def result_processor(self, dialect, coltype):
+ if self._should_use_blob(dialect):
+ # for plain BLOB, use traditional binary decode + json.loads()
+ string_process = self._str_impl.result_processor(dialect, coltype)
+ json_deserializer = dialect._json_deserializer or json.loads
+
+ def process(value):
+ if value is None:
+ return None
+ if string_process:
+ value = string_process(value)
+ return json_deserializer(value)
+
+ return process
+
+ else:
+ # for JSON, json decoder is set as an outputtypehandler
+ return None
+
class _OracleInteger(sqltypes.Integer):
def get_dbapi_type(self, dbapi):
update_executemany_returning = True
delete_executemany_returning = True
+ supports_native_json_serialization = False
+ supports_native_json_deserialization = False
+ dialect_injects_custom_json_deserializer = True
+
bind_typing = interfaces.BindTyping.SETINPUTSIZES
driver = "cx_oracle"
sqltypes.Float: _OracleFloat,
oracle.BINARY_FLOAT: _OracleBINARY_FLOAT,
oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE,
+ sqltypes.JSON: _OracleJson,
sqltypes.Integer: _OracleInteger,
oracle.NUMBER: _OracleNUMBER,
sqltypes.Date: _CXOracleDate,
dbapi_module.FIXED_NCHAR,
dbapi_module.FIXED_CHAR,
dbapi_module.TIMESTAMP,
+ # we dont make use of Oracle's JSON serialization; does not
+ # handle "none as null"
+ # dbapi_module.DB_TYPE_JSON,
int, # _OracleInteger,
# _OracleBINARY_FLOAT, _OracleBINARY_DOUBLE,
dbapi_module.NATIVE_FLOAT,
_CX_ORACLE_MAGIC_LOB_SIZE,
cursor.arraysize,
)
+ elif (
+ default_type is cx_Oracle.DB_TYPE_JSON
+ and dialect._json_deserializer is not None
+ ):
+ return cursor.var(
+ cx_Oracle.DB_TYPE_VARCHAR,
+ _CX_ORACLE_MAX_JSON_CONVERTED,
+ cursor.arraysize,
+ outconverter=dialect._json_deserializer,
+ )
return output_type_handler
--- /dev/null
+# dialects/oracle/json.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+from __future__ import annotations
+
+from typing import Any
+from typing import TYPE_CHECKING
+from typing import TypeVar
+
+from ... import types as sqltypes
+from ...sql.base import _NoArg
+from ...sql.base import NO_ARG
+from ...sql.sqltypes import _T_JSON
+
+
+if TYPE_CHECKING:
+ from ...engine.interfaces import Dialect
+ from ...sql.type_api import _BindProcessorType
+ from ...sql.type_api import _LiteralProcessorType
+
+_T = TypeVar("_T", bound=Any)
+
+
+class JSON(sqltypes.JSON[_T_JSON]):
+ """Oracle JSON type.
+
+ .. versionadded:: 2.1
+
+ Oracle Database supports JSON storage and querying for character and BLOB
+ datatypes in Oracle 12c, and supports a dedicated JSON data type as of
+ Oracle 21c. SQLAlchemy supports both of these scenarios when using the
+ oracledb DBAPI. This type is used implicitly whenever the base
+ :class:`_types.JSON` datatype is used against an Oracle backend, or may be
+ constructed directly for access to Oracle-specific parameters such as
+ :paramref:`_oracle.JSON.use_blob`.
+
+ Index operations are adapted to render using the ``JSON_QUERY`` and
+ ``JSON_VALUE`` functions at the database level.
+
+ **Platform Support** - When using Oracle Database versions prior to 21c,
+ BLOB is used as the storage format. In 21c or later, the native JSON
+ datatype is used. This can be overridden using the
+ :paramref:`_oracle.JSON.use_blob` parameter.
+
+ **Serialization / Deserialization** - JSON serialization of bound
+ parameters uses Python ``json.dumps()`` by default rather than oracledb's
+ native serializer, in order to support the
+ :paramref:`_sqltypes.JSON.none_as_null` feature. The default serializer
+ does **not** accept Python ``Decimal`` objects; to use a custom serializer,
+ pass :paramref:`_sa.create_engine.json_serializer` to
+ :func:`_sa.create_engine`.
+
+ When using the native JSON datatype (21c+), deserialization uses oracledb's
+ native deserializer by default, which is required for JSON values larger
+ than 32767 bytes. However, this deserializer returns all numeric values as
+ ``Decimal`` since Oracle Database stores JSON numbers using its internal
+ NUMBER type. To receive standard Python numeric types, pass
+ ``json_deserializer=json.loads`` via
+ :paramref:`_sa.create_engine.json_deserializer`; note that this limits
+ maximum JSON value size to 32767 bytes. When using BLOB storage,
+ SQLAlchemy deserializes using ``json.loads()`` directly rather than the
+ oracledb deserializer.
+
+ **CHECK Constraint with BLOB** - When using BLOB storage, either on Oracle
+ Database versions prior to 21c or via the :paramref:`_oracle.JSON.use_blob`
+ parameter, the oracledb driver documentation recommends adding a
+ ``<colname> IS JSON`` check constraint to indicate to the driver that the
+ column stores JSON data. This constraint is **not** automatically
+ generated by :class:`_oracle.JSON` and is not required by SQLAlchemy's
+ implementation in order to read JSON data from the column. If desired, it
+ can be added explicitly using :class:`_schema.CheckConstraint`.
+
+ .. seealso::
+
+ :class:`_types.JSON` - main documentation for the generic
+ cross-platform JSON datatype.
+
+ """
+
+ use_blob: bool | _NoArg
+
+ def __init__(
+ self, none_as_null: bool = False, use_blob: bool | _NoArg = NO_ARG
+ ):
+ """Construct a :class:`_oracle.JSON` type.
+
+ :param none_as_null=False: if True, persist the value ``None`` as a SQL
+ NULL value, not the JSON encoding of ``null``. See the notes at
+ :paramref:`_sqltypes.JSON.none_as_null` for complete background on
+ this option.
+
+ :param use_blob: A boolean parameter indicating if the type should be
+ rendered in DDL using BLOB instead of JSON. Normally, JSON or BLOB
+ is chosen automatically based on the version of Oracle in use
+ (21c or greater for JSON). If the parameter is left at its default
+ value of the ``NO_ARG`` constant, this automatic selection is used.
+ However when ``True``, the BLOB datatype will be used unconditionally,
+ and if ``False``, JSON will be used unconditionally (including on
+ backends older than 21c, which will raise an error by the server.
+ This may be used to assert that only JSON-supporting backends
+ should be used).
+
+ """
+
+ super().__init__(none_as_null=none_as_null)
+ self.use_blob = use_blob
+
+
+class _FormatTypeMixin:
+ def _format_value(self, value: Any) -> str:
+ raise NotImplementedError()
+
+ def bind_processor(self, dialect: Dialect) -> _BindProcessorType[Any]:
+ super_proc = self.string_bind_processor(dialect) # type: ignore[attr-defined] # noqa: E501
+
+ def process(value: Any) -> Any:
+ value = self._format_value(value)
+ if super_proc:
+ value = super_proc(value)
+ return value
+
+ return process
+
+ def literal_processor(
+ self, dialect: Dialect
+ ) -> _LiteralProcessorType[Any]:
+ super_proc = self.string_literal_processor(dialect) # type: ignore[attr-defined] # noqa: E501
+
+ def process(value: Any) -> str:
+ value = self._format_value(value)
+ if super_proc:
+ value = super_proc(value)
+ return value # type: ignore[no-any-return]
+
+ return process
+
+
+class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType):
+ def _format_value(self, value: Any) -> str:
+ if isinstance(value, int):
+ return f"$[{value}]"
+ else:
+ return f'$."{value}"'
+
+
+class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType):
+ def _format_value(self, value: Any) -> str:
+ return "$%s" % (
+ "".join(
+ f"[{elem}]" if isinstance(elem, int) else f'."{elem}"'
+ for elem in value
+ )
+ )
.. versionadded:: 2.0.0 added support for the python-oracledb driver.
+.. _oracledb_json:
+
+JSON Support
+------------
+
+Oracle Database supports a native JSON datatype as of version 21c, as well as
+support for JSON functions on character and BLOB columns as of version 12c. The
+SQLAlchemy :class:`_sqltypes.JSON` datatype may be used with the oracledb
+backend in the same way it works with any other backend, with some slight
+behavioral changes particularly when using the native JSON datatype. See
+:class:`_oracle.JSON` for platform-specific notes.
+
+.. versionadded:: 2.1 added JSON support for the Oracle backend.
+
+
""" # noqa
from __future__ import annotations
class VARCHAR2(VARCHAR):
__visit_name__ = "VARCHAR2"
+ @classmethod
+ def _adapt_string_for_cast(cls, type_: sqltypes.String) -> "VARCHAR2":
+ """Adapt a String type for use in CAST expressions.
+
+ Oracle requires a length for VARCHAR2 in CAST expressions.
+ If no length is specified, we default to 4000 (max for VARCHAR2).
+ """
+ type_ = sqltypes.to_instance(type_)
+ if isinstance(type_, VARCHAR2):
+ return type_
+ elif isinstance(type_, VARCHAR):
+ return VARCHAR2(
+ length=type_.length or 4000, collation=type_.collation
+ )
+ else:
+ return VARCHAR2(length=type_.length or 4000)
+
NVARCHAR2 = NVARCHAR
render_bind_cast = True
-class AsyncpgJSON(json.JSON):
- def result_processor(self, dialect, coltype):
- return None
-
-
-class AsyncpgJSONB(json.JSONB):
- def result_processor(self, dialect, coltype):
- return None
-
-
class AsyncpgJSONIndexType(sqltypes.JSON.JSONIndexType):
pass
statement_compiler = PGCompiler_asyncpg
preparer = PGIdentifierPreparer_asyncpg
+ supports_native_json_serialization = False
+ supports_native_json_deserialization = True
+ dialect_injects_custom_json_deserializer = True
+
colspecs = util.update_copy(
PGDialect.colspecs,
{
sqltypes.BigInteger: AsyncpgBigInteger,
sqltypes.Numeric: AsyncpgNumeric,
sqltypes.Float: AsyncpgFloat,
- sqltypes.JSON: AsyncpgJSON,
sqltypes.LargeBinary: AsyncpgByteA,
- json.JSONB: AsyncpgJSONB,
sqltypes.JSON.JSONPathType: AsyncpgJSONPathType,
sqltypes.JSON.JSONIndexType: AsyncpgJSONIndexType,
sqltypes.JSON.JSONIntIndexType: AsyncpgJSONIntIndexType,
class _PGJSON(JSON):
render_bind_cast = True
- def result_processor(self, dialect, coltype):
- return None
-
class _PGJSONB(JSONB):
render_bind_cast = True
- def result_processor(self, dialect, coltype):
- return None
-
class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
def get_dbapi_type(self, dbapi):
preparer = PGIdentifierPreparer_pg8000
supports_server_side_cursors = True
+ supports_native_json_serialization = False
+ supports_native_json_deserialization = True
+ dialect_injects_custom_json_deserializer = True
+
render_bind_cast = True
# reversed as of pg8000 1.16.6. 1.16.5 and lower
class _PGJSON(JSON):
def bind_processor(self, dialect):
+ """psycopg's bind processor is assembled on the type adapter,
+ but we still need to wrap the value in a psycopg.Json() object"""
return self._make_bind_processor(None, dialect._psycopg_Json)
- def result_processor(self, dialect, coltype):
- return None
-
class _PGJSONB(JSONB):
def bind_processor(self, dialect):
+ """psycopg's bind processor is assembled on the type adapter,
+ but we still need to wrap the value in a psycopg.Jsonb() object"""
return self._make_bind_processor(None, dialect._psycopg_Jsonb)
- def result_processor(self, dialect, coltype):
- return None
-
class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
__visit_name__ = "json_int_index"
default_paramstyle = "pyformat"
supports_sane_multi_rowcount = True
+ supports_native_json_serialization = True
+ supports_native_json_deserialization = True
+ dialect_injects_custom_json_deserializer = True
+
execution_ctx_cls = PGExecutionContext_psycopg
statement_compiler = PGCompiler_psycopg
preparer = PGIdentifierPreparer_psycopg
from ...engine.interfaces import DBAPIConnection
from ...engine.interfaces import Dialect
from ...engine.interfaces import IsolationLevel
+ from ...sql.sqltypes import _JSON_VALUE
from ...sql.type_api import _BindProcessorType
from ...sql.type_api import _ResultProcessorType
def __init__(
self,
native_datetime: bool = False,
- json_serializer: Optional[Callable[..., Any]] = None,
- json_deserializer: Optional[Callable[..., Any]] = None,
+ json_serializer: Callable[[_JSON_VALUE], str] | None = None,
+ json_deserializer: Callable[[str], _JSON_VALUE] | None = None,
**kwargs: Any,
) -> None:
default.DefaultDialect.__init__(self, **kwargs)
:param json_deserializer: for dialects that support the
:class:`_types.JSON`
datatype, this is a Python callable that will convert a JSON string
- to a Python object. By default, the Python ``json.loads`` function is
- used.
+ to a Python object. By default, either the driver's built-in
+ capabilities are used, or if none are available, the Python
+ ``json.loads`` function is used.
:param json_serializer: for dialects that support the :class:`_types.JSON`
- datatype, this is a Python callable that will render a given object
- as JSON. By default, the Python ``json.dumps`` function is used.
+ datatype, this is a Python callable that will render a given object as
+ JSON. By default, the Python ``json.dumps`` function is used.
:param label_length=None: optional integer value which limits
the size of dynamically generated column labels to that many
from ..sql.dml import UpdateBase
from ..sql.elements import BindParameter
from ..sql.schema import Column
+ from ..sql.sqltypes import _JSON_VALUE
from ..sql.type_api import _BindProcessorType
from ..sql.type_api import _ResultProcessorType
from ..sql.type_api import TypeEngine
supports_native_uuid = False
returns_native_bytes = False
+ supports_native_json_serialization = False
+ supports_native_json_deserialization = False
+ dialect_injects_custom_json_deserializer = False
+ _json_serializer: Callable[[_JSON_VALUE], str] | None = None
+
+ _json_deserializer: Callable[[str], _JSON_VALUE] | None = None
+
non_native_boolean_check_constraint = True
supports_simple_order_by_label = True
from ..sql.schema import DefaultGenerator
from ..sql.schema import SchemaItem
from ..sql.schema import Sequence as Sequence_SchemaItem
+ from ..sql.sqltypes import _JSON_VALUE
from ..sql.sqltypes import Integer
from ..sql.type_api import _TypeMemoDict
from ..sql.type_api import TypeEngine
"""
+ _json_serializer: Callable[[_JSON_VALUE], str] | None
+
+ _json_deserializer: Callable[[str], _JSON_VALUE] | None
+
+ supports_native_json_serialization: bool
+ """target dialect includes a native JSON serializer, eliminating
+ the need to use json.dumps() for JSON data
+
+ .. versionadded:: 2.1
+
+ """
+
+ supports_native_json_deserialization: bool
+ """target dialect includes a native JSON deserializer, eliminating
+ the need to use json.loads() for JSON data
+
+ .. versionadded:: 2.1
+
+ """
+
+ dialect_injects_custom_json_deserializer: bool
+ """target dialect, when given a custom _json_deserializer, needs to
+ inject this handler at the connection/cursor level, rather than
+ having JSON data returned as a string to be handled by the type
+
+ ..versionadded:: 2.1
+
+ """
+
aggregate_order_by_style: AggregateOrderByStyle
"""Style of ORDER BY supported for arbitrary aggregate functions
* Microsoft SQL Server 2016 and later - see
:class:`sqlalchemy.dialects.mssql.JSON` for backend-specific notes
+ * Oracle 21c and later - see :class:`sqlalchemy.dialects.oracle.JSON`
+ for backend-specific notes
+
:class:`_types.JSON` is part of the Core in support of the growing
popularity of native JSON datatypes.
:class:`sqlalchemy.dialects.sqlite.JSON`
+ :class:`sqlalchemy.dialects.oracle.JSON`
+
""" # noqa: E501
__visit_name__ = "JSON"
return process
def bind_processor(self, dialect):
+ if (
+ dialect._json_serializer is None
+ and dialect.supports_native_json_serialization
+ ):
+ return None
+
string_process = self._str_impl.bind_processor(dialect)
json_serializer = dialect._json_serializer or json.dumps
return self._make_bind_processor(string_process, json_serializer)
- def result_processor(self, dialect, coltype):
+ def result_processor(
+ self, dialect: Dialect, coltype: object
+ ) -> Optional[_ResultProcessorType[_T_JSON]]:
+
+ # note that for dialects that have native json deserialization,
+ # a custom deserializer function typically needs to be
+ # installed at the connection level, as an adapter, codec,
+ # or outputtypehandler, so return None here
+ if dialect.supports_native_json_deserialization and (
+ dialect._json_deserializer is None
+ or dialect.dialect_injects_custom_json_deserializer
+ ):
+ return None
+
string_process = self._str_impl.result_processor(dialect, coltype)
json_deserializer = dialect._json_deserializer or json.loads
)
-class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest):
+class JSONTest(fixtures.TablesTest):
__requires__ = ("json_type",)
__backend__ = True
Table(
"data_table",
metadata,
- Column("id", Integer, primary_key=True),
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
Column("name", String(30), nullable=False),
- Column("data", cls.datatype, nullable=False),
+ Column("data", cls.datatype(), nullable=False),
Column("nulldata", cls.datatype(none_as_null=True)),
)
select(data_table.c.data, data_table.c.nulldata)
).first()
- eq_(row, (data_element, data_element))
+ if isinstance(data_element, float):
+ c1, c2 = row
+ eq_((float(c1), float(c2)), (data_element, data_element))
+ else:
+ eq_(row, (data_element, data_element))
def test_round_trip_custom_json(self):
data_table = self.tables.data_table
eq_(row, (data_element,))
eq_(js.mock_calls, [mock.call(data_element)])
+
+ eq_(len(jd.mock_calls), 1)
+ eq_(len(jd.mock_calls[0].args), 1)
+
+ # oracledb's json outputtypehandler receives the json
+ # without spaces between the colons, so we have to normalize
+ # for the compare
+
if testing.requires.json_deserializer_binary.enabled:
- eq_(
- jd.mock_calls,
- [mock.call(json.dumps(data_element).encode())],
- )
+ json_str_given_to_adapter = jd.mock_calls[0].args[0].decode()
else:
- eq_(jd.mock_calls, [mock.call(json.dumps(data_element))])
+ json_str_given_to_adapter = jd.mock_calls[0].args[0]
+
+ eq_(
+ json.dumps(json.loads(json_str_given_to_adapter)),
+ json.dumps(data_element),
+ )
@testing.combinations(
("parameters",),
Table(
"data_table",
metadata,
- Column("id", Integer, primary_key=True),
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
Column("name", String(30), nullable=False),
Column("data", cls.datatype),
Column("nulldata", cls.datatype(none_as_null=True)),
import array
import datetime
import decimal
+import functools
import os
import random
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+from sqlalchemy.testing.suite import test_types as suite
from sqlalchemy.util import b
from sqlalchemy.util.concurrency import await_
start.dialect_impl(dialect), test
), "wanted %r got %r" % (test, start.dialect_impl(dialect))
+ @testing.variation(
+ "use_blob",
+ ["none", "true", "false", "dialect_support", "dialect_not_support"],
+ )
+ def test_json_types(self, use_blob):
+ if use_blob.none:
+ self.assert_compile(oracle.JSON(), "JSON")
+ elif use_blob.false:
+ self.assert_compile(oracle.JSON(use_blob=False), "JSON")
+ elif use_blob.true:
+ self.assert_compile(oracle.JSON(use_blob=True), "BLOB")
+ elif use_blob.dialect_support:
+ dialect = oracle.OracleDialect()
+ dialect._supports_oracle_json = True
+ self.assert_compile(oracle.JSON(), "JSON", dialect=dialect)
+
+ # test force override
+ self.assert_compile(
+ oracle.JSON(use_blob=True), "BLOB", dialect=dialect
+ )
+ elif use_blob.dialect_not_support:
+ dialect = oracle.OracleDialect()
+ dialect._supports_oracle_json = False
+ self.assert_compile(oracle.JSON(), "BLOB", dialect=dialect)
+
+ # test force override
+ self.assert_compile(
+ oracle.JSON(use_blob=False), "JSON", dialect=dialect
+ )
+
@testing.combinations(
(String(), String),
(VARCHAR(), cx_oracle._OracleString),
)
finally:
event.remove(testing.db, "do_setinputsizes", _remove_type)
+
+
+class JSONTest(fixtures.TestBase):
+ __requires__ = ("json_type",)
+ __only_on__ = "oracle"
+ __backend__ = True
+
+ @testing.requires.reflects_json_type
+ def test_reflection(self, metadata, connection):
+ Table("oracle_json", metadata, Column("foo", oracle.JSON))
+ metadata.create_all(connection)
+
+ reflected = Table("oracle_json", MetaData(), autoload_with=connection)
+ is_(reflected.c.foo.type._type_affinity, sqltypes.JSON)
+ assert isinstance(reflected.c.foo.type, oracle.JSON)
+
+ def test_rudimentary_round_trip(self, metadata, connection):
+ oracle_json = Table(
+ "oracle_json", metadata, Column("foo", oracle.JSON)
+ )
+ metadata.create_all(connection)
+
+ value = {"json": {"foo": "bar"}, "recs": ["one", "two"]}
+
+ connection.execute(oracle_json.insert(), dict(foo=value))
+
+ eq_(connection.scalar(select(oracle_json.c.foo)), value)
+
+ def test_extract_subobject(self, connection, metadata):
+ oracle_json = Table(
+ "oracle_json", metadata, Column("foo", oracle.JSON)
+ )
+ metadata.create_all(connection)
+
+ value = {"json": {"foo": "bar"}}
+ connection.execute(oracle_json.insert(), dict(foo=value))
+
+ eq_(
+ connection.scalar(select(oracle_json.c.foo["json"])),
+ value["json"],
+ )
+
+
+class JSONBlobSuiteTest(suite.JSONTest):
+ __only_on__ = "oracle+oracledb"
+
+ datatype = functools.partial(oracle.JSON, use_blob=True)
"postgresql >= 9.3",
self._sqlite_json,
"mssql",
+ "oracle>=21",
]
- )
+ ) + skip_if("oracle+cx_oracle")
@property
def json_index_supplementary_unicode_element(self):
and not config.db.dialect._is_mariadb,
"postgresql >= 9.3",
"sqlite >= 3.9",
+ "oracle>=21",
]
)
t = Table(
"t",
metadata,
- Column("id", Integer, primary_key=True),
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
Column("data", JsonDec),
)
t.create(connection)