:ticket:`4689`
+
+.. _change_1390:
+
+Support for SQL Regular Expression operators
+--------------------------------------------
+
+A long awaited feature to add rudimentary support for database regular
+expression operators, to complement the :meth:`_sql.ColumnOperators.like` and
+:meth:`_sql.ColumnOperators.match` suites of operations. The new features
+include :meth:`_sql.ColumnOperators.regexp_match` implementing a regular
+expression match like function, and :meth:`_sql.ColumnOperators.regexp_replace`
+implementing a regular expression string replace function.
+
+Supported backends include SQLite, PostgreSQL, MySQL / MariaDB, and Oracle.
+The SQLite backend only supports "regexp_match" but not "regexp_replace".
+
+The regular expression syntaxes and flags are **not backend agnostic**.
+A future feature will allow multiple regular expression syntaxes to be
+specified at once to switch between different backends on the fly.
+
+For SQLite, Python's ``re.match()`` function with no additional arguments
+is established as the implementation.
+
+.. seealso::
+
+
+ :meth:`_sql.ColumnOperators.regexp_match`
+
+ :meth:`_sql.ColumnOperators.regexp_replace`
+
+ :ref:`pysqlite_regexp` - SQLite implementation notes
+
+
+:ticket:`1390`
+
+
API and Behavioral Changes - Core
==================================
--- /dev/null
+.. change::
+ :tags: usecase, sql
+ :tickets: 1390
+
+ Add support for regular expression on supported backends.
+ Two operations have been defined:
+
+ * :meth:`_sql.ColumnOperators.regexp_match` implementing a regular
+ expression match like function.
+ * :meth:`_sql.ColumnOperators.regexp_replace` implementing a regular
+ expression string replace function.
+
+ Supported backends include SQLite, PostgreSQL, MySQL / MariaDB, and Oracle.
+
+ .. seealso::
+
+ :ref:`change_1390`
"sqlalchemy.sql.dml": "sqlalchemy.sql.expression",
"sqlalchemy.sql.ddl": "sqlalchemy.schema",
"sqlalchemy.sql.base": "sqlalchemy.sql.expression",
+ "sqlalchemy.sql.operators": "sqlalchemy.sql.expression",
"sqlalchemy.event.base": "sqlalchemy.event",
"sqlalchemy.engine.base": "sqlalchemy.engine",
"sqlalchemy.engine.url": "sqlalchemy.engine",
.. _identity_ddl:
Identity Columns (GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY)
---------------------------------------------------------------
+-----------------------------------------------------------------
.. versionadded:: 1.4
.. autofunction:: cast
-.. autofunction:: sqlalchemy.sql.expression.column
+.. autofunction:: column
.. autofunction:: collate
:inherited-members:
:undoc-members:
-.. autoclass:: sqlalchemy.sql.operators.ColumnOperators
+.. autoclass:: ColumnOperators
:members:
:special-members:
:inherited-members:
.. autoclass:: Extract
:members:
-.. autoclass:: sqlalchemy.sql.elements.False_
+.. autoclass:: False_
:members:
.. autoclass:: FunctionFilter
.. autoclass:: LambdaElement
:members:
-.. autoclass:: sqlalchemy.sql.elements.Null
+.. autoclass:: Null
:members:
.. autoclass:: Over
.. autoclass:: sqlalchemy.sql.elements.WrapsColumnExpression
:members:
-.. autoclass:: sqlalchemy.sql.elements.True_
+.. autoclass:: True_
:members:
.. autoclass:: TypeCoerce
:members:
-.. autoclass:: sqlalchemy.sql.operators.custom_op
+.. autoclass:: custom_op
:members:
-.. autoclass:: sqlalchemy.sql.operators.Operators
+.. autoclass:: Operators
:members:
:special-members:
-.. autoclass:: sqlalchemy.sql.elements.quoted_name
+.. autoclass:: quoted_name
.. attribute:: quote
self.process(binary.right),
)
+ def _mariadb_regexp_flags(self, flags, pattern, **kw):
+ return "CONCAT('(?', %s, ')', %s)" % (
+ self.process(flags, **kw),
+ self.process(pattern, **kw),
+ )
+
+ def _regexp_match(self, op_string, binary, operator, **kw):
+ flags = binary.modifiers["flags"]
+ if flags is None:
+ return self._generate_generic_binary(binary, op_string, **kw)
+ elif self.dialect.is_mariadb:
+ return "%s%s%s" % (
+ self.process(binary.left, **kw),
+ op_string,
+ self._mariadb_regexp_flags(flags, binary.right),
+ )
+ else:
+ text = "REGEXP_LIKE(%s, %s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ self.process(flags, **kw),
+ )
+ if op_string == " NOT REGEXP ":
+ return "NOT %s" % text
+ else:
+ return text
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match(" REGEXP ", binary, operator, **kw)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match(" NOT REGEXP ", binary, operator, **kw)
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ flags = binary.modifiers["flags"]
+ replacement = binary.modifiers["replacement"]
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ self.process(replacement, **kw),
+ )
+ elif self.dialect.is_mariadb:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ self.process(binary.left, **kw),
+ self._mariadb_regexp_flags(flags, binary.right),
+ self.process(replacement, **kw),
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ self.process(replacement, **kw),
+ self.process(flags, **kw),
+ )
+
class MySQLDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kw):
or the association of a SEQUENCE with the column.
Specifying GENERATED AS IDENTITY (Oracle 12 and above)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Starting from version 12 Oracle can make use of identity columns using
the :class:`_sql.Identity` to specify the autoincrementing behavior::
in conjunction with a 'BY DEFAULT' identity column.
Using a SEQUENCE (all Oracle versions)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Older version of Oracle had no "autoincrement"
feature, SQLAlchemy relies upon sequences to produce these values. With the
self.process(binary.right),
)
+ def _get_regexp_args(self, binary, kw):
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is not None:
+ flags = self.process(flags, **kw)
+ return string, pattern, flags
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ string, pattern, flags = self._get_regexp_args(binary, kw)
+ if flags is None:
+ return "REGEXP_LIKE(%s, %s)" % (string, pattern)
+ else:
+ return "REGEXP_LIKE(%s, %s, %s)" % (string, pattern, flags)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return "NOT %s" % self.visit_regexp_match_op_binary(
+ binary, operator, **kw
+ )
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ string, pattern, flags = self._get_regexp_args(binary, kw)
+ replacement = self.process(binary.modifiers["replacement"], **kw)
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ flags,
+ )
+
class OracleDDLCompiler(compiler.DDLCompiler):
def define_constraint_cascades(self, constraint):
else ""
)
+ def _regexp_match(self, base_op, binary, operator, kw):
+ flags = binary.modifiers["flags"]
+ if flags is None:
+ return self._generate_generic_binary(
+ binary, " %s " % base_op, **kw
+ )
+ if isinstance(flags, elements.BindParameter) and flags.value == "i":
+ return self._generate_generic_binary(
+ binary, " %s* " % base_op, **kw
+ )
+ flags = self.process(flags, **kw)
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ return "%s %s CONCAT('(?', %s, ')', %s)" % (
+ string,
+ base_op,
+ flags,
+ pattern,
+ )
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match("~", binary, operator, kw)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._regexp_match("!~", binary, operator, kw)
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is not None:
+ flags = self.process(flags, **kw)
+ replacement = self.process(binary.modifiers["replacement"], **kw)
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
+ string,
+ pattern,
+ replacement,
+ flags,
+ )
+
def visit_empty_set_expr(self, element_types):
# cast the empty set to the type we are comparing against. if
# we are comparing against the null type, pick an arbitrary
", ".join("1" for type_ in element_types or [INTEGER()]),
)
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._generate_generic_binary(binary, " REGEXP ", **kw)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
+
class SQLiteDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
`Uniform Resource Identifiers <https://www.sqlite.org/uri.html>`_ - in
the SQLite documentation
+.. _pysqlite_regexp:
+
+Regular Expression Support
+---------------------------
+
+.. versionadded:: 1.4
+
+Support for the :meth:`_sql.ColumnOperators.regexp_match` operator is provided
+using Python's re.match_ function. SQLite itself does not include a working
+regular expression operator; instead, it includes a non-implemented placeholder
+operator ``REGEXP`` that calls a user-defined function that must be provided.
+
+SQLAlchemy's implementation makes use of the pysqlite create_function_ hook
+as follows::
+
+
+ def regexp(a, b):
+ return bool(re.match(a, b))
+
+ sqlite_connection.create_function(
+ "regexp", 2, regexp,
+ )
+
+There is currently no support for regular expression flags as a separate
+argument, as these are not supported by SQLite's REGEXP operator, however these
+may be included inline within the regular expression string. See `Python regular expressions`_ for
+details.
+
+.. seealso::
+
+ `Python regular expressions`_: Documentation for Python's regular expression syntax.
+
+.. _create_function: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function
+
+.. _re.match: https://docs.python.org/3/library/re.html#re.match
+
+.. _Python regular expressions: https://docs.python.org/3/library/re.html#re.match
+
+
+
Compatibility with sqlite3 "native" date and datetime types
-----------------------------------------------------------
""" # noqa
import os
+import re
from .base import DATE
from .base import DATETIME
connection, level
)
+ def on_connect(self):
+ connect = super(SQLiteDialect_pysqlite, self).on_connect()
+
+ def regexp(a, b):
+ return bool(re.match(a, b))
+
+ def set_regexp(connection):
+ if hasattr(connection, "connection"):
+ dbapi_connection = connection.connection
+ else:
+ dbapi_connection = connection
+
+ dbapi_connection.create_function(
+ "regexp", 2, regexp,
+ )
+
+ fns = [set_regexp]
+
+ if self.isolation_level is not None:
+
+ def iso_level(conn):
+ self.set_isolation_level(conn, self.isolation_level)
+
+ fns.append(iso_level)
+
+ def connect(conn):
+ for fn in fns:
+ fn(conn)
+
+ return connect
+
def create_connect_args(self, url):
if url.username or url.password or url.host or url.port:
raise exc.ArgumentError(
**kw
)
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ raise exc.CompileError(
+ "%s dialect does not support regular expressions"
+ % self.dialect.name
+ )
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ raise exc.CompileError(
+ "%s dialect does not support regular expressions"
+ % self.dialect.name
+ )
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ raise exc.CompileError(
+ "%s dialect does not support regular expression replacements"
+ % self.dialect.name
+ )
+
def visit_bindparam(
self,
bindparam,
def get_from_hint_text(self, table, text):
return "[%s]" % text
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._generate_generic_binary(binary, " <regexp> ", **kw)
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return self._generate_generic_binary(binary, " <not regexp> ", **kw)
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ replacement = binary.modifiers["replacement"]
+ return "<regexp replace>(%s, %s, %s)" % (
+ binary.left._compiler_dispatch(self, **kw),
+ binary.right._compiler_dispatch(self, **kw),
+ replacement._compiler_dispatch(self, **kw),
+ )
+
class DDLCompiler(Compiled):
@util.memoized_property
return collate(expr, other)
+def _regexp_match_impl(expr, op, pattern, flags, **kw):
+ if flags is not None:
+ flags = coercions.expect(
+ roles.BinaryElementRole,
+ flags,
+ expr=expr,
+ operator=operators.regexp_replace_op,
+ )
+ return _boolean_compare(
+ expr,
+ op,
+ pattern,
+ flags=flags,
+ negate=operators.not_regexp_match_op
+ if op is operators.regexp_match_op
+ else operators.regexp_match_op,
+ **kw
+ )
+
+
+def _regexp_replace_impl(expr, op, pattern, replacement, flags, **kw):
+ replacement = coercions.expect(
+ roles.BinaryElementRole,
+ replacement,
+ expr=expr,
+ operator=operators.regexp_replace_op,
+ )
+ if flags is not None:
+ flags = coercions.expect(
+ roles.BinaryElementRole,
+ flags,
+ expr=expr,
+ operator=operators.regexp_replace_op,
+ )
+ return _binary_operate(
+ expr, op, pattern, replacement=replacement, flags=flags, **kw
+ )
+
+
# a mapping of operators with the method they use, along with
# their negated operator for comparison operators
operator_lookup = {
"lshift": (_unsupported_impl,),
"rshift": (_unsupported_impl,),
"contains": (_unsupported_impl,),
+ "regexp_match_op": (_regexp_match_impl,),
+ "not_regexp_match_op": (_regexp_match_impl,),
+ "regexp_replace_op": (_regexp_replace_impl,),
}
"case",
"cast",
"column",
+ "custom_op",
"cte",
"delete",
"desc",
"union",
"union_all",
"update",
+ "quoted_name",
"within_group",
"Subquery",
"TableSample",
from .lambdas import lambda_stmt # noqa
from .lambdas import LambdaElement # noqa
from .lambdas import StatementLambdaElement # noqa
+from .operators import ColumnOperators # noqa
+from .operators import custom_op # noqa
+from .operators import Operators # noqa
from .selectable import Alias # noqa
from .selectable import AliasedReturnsRows # noqa
from .selectable import CompoundSelect # noqa
def match(self, other, **kwargs):
"""Implements a database-specific 'match' operator.
- :meth:`~.ColumnOperators.match` attempts to resolve to
+ :meth:`_sql.ColumnOperators.match` attempts to resolve to
a MATCH-like function or operator provided by the backend.
Examples include:
"""
return self.operate(match_op, other, **kwargs)
+ def regexp_match(self, pattern, flags=None):
+ """Implements a database-specific 'regexp match' operator.
+
+ E.g.::
+
+ stmt = select(table.c.some_column).where(
+ table.c.some_column.regexp_match('^(b|c)')
+ )
+
+ :meth:`_sql.ColumnOperators.regexp_match` attempts to resolve to
+ a REGEXP-like function or operator provided by the backend, however
+ the specific regular expression syntax and flags available are
+ **not backend agnostic**.
+
+ Examples include:
+
+ * PostgreSQL - renders ``x ~ y`` or ``x !~ y`` when negated.
+ * Oracle - renders ``REGEXP_LIKE(x, y)``
+ * SQLite - uses SQLite's ``REGEXP`` placeholder operator and calls into
+ the Python ``re.match()`` builtin.
+ * other backends may provide special implementations.
+ * Backends without any special implementation will emit
+ the operator as "REGEXP" or "NOT REGEXP". This is compatible with
+ SQLite and MySQL, for example.
+
+ Regular expression support is currently implemented for Oracle,
+ PostgreSQL, MySQL and MariaDB. Partial support is available for
+ SQLite. Support among third-party dialects may vary.
+
+ :param pattern: The regular expression pattern string or column
+ clause.
+ :param flags: Any regular expression string flags to apply. Flags
+ tend to be backend specific. It can be a string or a column clause.
+ Some backends, like PostgreSQL and MariaDB, may alternatively
+ specify the flags as part of the pattern.
+ When using the ignore case flag 'i' in PostgreSQL, the ignore case
+ regexp match operator ``~*`` or ``!~*`` will be used.
+
+ .. versionadded:: 1.4
+
+ .. seealso::
+
+ :meth:`_sql.ColumnOperators.regexp_replace`
+
+
+ """
+ return self.operate(regexp_match_op, pattern, flags=flags)
+
+ def regexp_replace(self, pattern, replacement, flags=None):
+ """Implements a database-specific 'regexp replace' operator.
+
+ E.g.::
+
+ stmt = select(
+ table.c.some_column.regexp_replace(
+ 'b(..)',
+ 'X\1Y',
+ flags='g'
+ )
+ )
+
+ :meth:`_sql.ColumnOperators.regexp_replace` attempts to resolve to
+ a REGEXP_REPLACE-like function provided by the backend, that
+ usually emit the function ``REGEXP_REPLACE()``. However,
+ the specific regular expression syntax and flags available are
+ **not backend agnostic**.
+
+ Regular expression replacement support is currently implemented for
+ Oracle, PostgreSQL, MySQL 8 or greater and MariaDB. Support among
+ third-party dialects may vary.
+
+ :param pattern: The regular expression pattern string or column
+ clause.
+ :param pattern: The replacement string or column clause.
+ :param flags: Any regular expression string flags to apply. Flags
+ tend to be backend specific. It can be a string or a column clause.
+ Some backends, like PostgreSQL and MariaDB, may alternatively
+ specify the flags as part of the pattern.
+
+ .. versionadded:: 1.4
+
+ .. seealso::
+
+ :meth:`_sql.ColumnOperators.regexp_match`
+
+ """
+ return self.operate(
+ regexp_replace_op, pattern, replacement=replacement, flags=flags
+ )
+
def desc(self):
"""Produce a :func:`_expression.desc` clause against the
parent object."""
return a.match(b, **kw)
+@comparison_op
+def regexp_match_op(a, b, flags=None):
+ return a.regexp_match(b, flags=flags)
+
+
+@comparison_op
+def not_regexp_match_op(a, b, flags=None):
+ return ~a.regexp_match(b, flags=flags)
+
+
+def regexp_replace_op(a, b, replacement, flags=None):
+ return a.regexp_replace(b, replacement=replacement, flags=flags)
+
+
@comparison_op
def notmatch_op(a, b, **kw):
return a.notmatch(b, **kw)
filter_op: 6,
match_op: 5,
notmatch_op: 5,
+ regexp_match_op: 5,
+ not_regexp_match_op: 5,
+ regexp_replace_op: 5,
ilike_op: 5,
notilike_op: 5,
like_op: 5,
This is mainly to exclude MSSql.
"""
return exclusions.closed()
+
+ @property
+ def regexp_match(self):
+ """backend supports the regexp_match operator.
+
+ .. versionadded:: 1.4
+
+ """
+ return exclusions.closed()
+
+ @property
+ def regexp_replace(self):
+ """backend supports the regexp_replace operator.
+
+ .. versionadded:: 1.4
+
+
+ """
+ return exclusions.closed()
self._test(col.contains("b%cd", autoescape=True, escape="#"), {3})
self._test(col.contains("b#cd", autoescape=True, escape="#"), {7})
+ @testing.requires.regexp_match
+ def test_regexp_match(self):
+ col = self.tables.some_table.c.data
+ self._test(col.regexp_match("a.cde"), {1, 5, 6, 9})
+
+ @testing.requires.regexp_match
+ def test_not_regexp_match(self):
+ col = self.tables.some_table.c.data
+ self._test(~col.regexp_match("a.cde"), {2, 3, 4, 7, 8, 10})
+
+ @testing.requires.regexp_replace
+ def test_regexp_replace(self):
+ col = self.tables.some_table.c.data
+ self._test(
+ col.regexp_replace("a.cde", "FOO").contains("FOO"), {1, 5, 6, 9}
+ )
+
class ComputedColumnTest(fixtures.TablesTest):
__backend__ = True
"baz_1": "some literal",
},
)
+
+
+class RegexpCommon(testing.AssertsCompiledSQL):
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid REGEXP %s",
+ checkpositional=("pattern",),
+ )
+
+ def test_regexp_match_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid REGEXP mytable.name",
+ checkpositional=(),
+ )
+
+ def test_regexp_match_str(self):
+ self.assert_compile(
+ literal("string").regexp_match(self.table.c.name),
+ "%s REGEXP mytable.name",
+ checkpositional=("string",),
+ )
+
+ def test_not_regexp_match(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid NOT REGEXP %s",
+ checkpositional=("pattern",),
+ )
+
+ def test_not_regexp_match_column(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid NOT REGEXP mytable.name",
+ checkpositional=(),
+ )
+
+ def test_not_regexp_match_str(self):
+ self.assert_compile(
+ ~literal("string").regexp_match(self.table.c.name),
+ "%s NOT REGEXP mytable.name",
+ checkpositional=("string",),
+ )
+
+ def test_regexp_replace(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", "replacement"),
+ "REGEXP_REPLACE(mytable.myid, %s, %s)",
+ checkpositional=("pattern", "replacement"),
+ )
+
+ def test_regexp_replace_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(mytable.myid, %s, mytable.name)",
+ checkpositional=("pattern",),
+ )
+
+ def test_regexp_replace_column2(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(self.table.c.name, "replacement"),
+ "REGEXP_REPLACE(mytable.myid, mytable.name, %s)",
+ checkpositional=("replacement",),
+ )
+
+ def test_regexp_replace_string(self):
+ self.assert_compile(
+ literal("string").regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(%s, %s, mytable.name)",
+ checkpositional=("string", "pattern"),
+ )
+
+
+class RegexpTestMySql(fixtures.TestBase, RegexpCommon):
+ __dialect__ = "mysql"
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "REGEXP_LIKE(mytable.myid, %s, %s)",
+ checkpositional=("pattern", "ig"),
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "NOT REGEXP_LIKE(mytable.myid, %s, %s)",
+ checkpositional=("pattern", "ig"),
+ )
+
+ def test_regexp_replace_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags="ig"
+ ),
+ "REGEXP_REPLACE(mytable.myid, %s, %s, %s)",
+ checkpositional=("pattern", "replacement", "ig"),
+ )
+
+
+class RegexpTestMariaDb(fixtures.TestBase, RegexpCommon):
+ __dialect__ = "mariadb"
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid REGEXP CONCAT('(?', %s, ')', %s)",
+ checkpositional=("ig", "pattern"),
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid NOT REGEXP CONCAT('(?', %s, ')', %s)",
+ checkpositional=("ig", "pattern"),
+ )
+
+ def test_regexp_replace_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags="ig"
+ ),
+ "REGEXP_REPLACE(mytable.myid, CONCAT('(?', %s, ')', %s), %s)",
+ checkpositional=("ig", "pattern", "replacement"),
+ )
dialect.identifier_preparer.format_sequence(seq)
== '"Some_Schema"."My_Seq"'
)
+
+
+class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "oracle"
+
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern"),
+ "REGEXP_LIKE(mytable.myid, :myid_1)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_match_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match(self.table.c.name),
+ "REGEXP_LIKE(mytable.myid, mytable.name)",
+ checkparams={},
+ )
+
+ def test_regexp_match_str(self):
+ self.assert_compile(
+ literal("string").regexp_match(self.table.c.name),
+ "REGEXP_LIKE(:param_1, mytable.name)",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "REGEXP_LIKE(mytable.myid, :myid_1, :myid_2)",
+ checkparams={"myid_1": "pattern", "myid_2": "ig"},
+ )
+
+ def test_regexp_match_flags_col(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags=self.table.c.name),
+ "REGEXP_LIKE(mytable.myid, :myid_1, mytable.name)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern"),
+ "NOT REGEXP_LIKE(mytable.myid, :myid_1)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match_column(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(self.table.c.name),
+ "NOT REGEXP_LIKE(mytable.myid, mytable.name)",
+ checkparams={},
+ )
+
+ def test_not_regexp_match_str(self):
+ self.assert_compile(
+ ~literal("string").regexp_match(self.table.c.name),
+ "NOT REGEXP_LIKE(:param_1, mytable.name)",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_not_regexp_match_flags_col(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(
+ "pattern", flags=self.table.c.name
+ ),
+ "NOT REGEXP_LIKE(mytable.myid, :myid_1, mytable.name)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "NOT REGEXP_LIKE(mytable.myid, :myid_1, :myid_2)",
+ checkparams={"myid_1": "pattern", "myid_2": "ig"},
+ )
+
+ def test_regexp_replace(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", "replacement"),
+ "REGEXP_REPLACE(mytable.myid, :myid_1, :myid_2)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
+
+ def test_regexp_replace_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(mytable.myid, :myid_1, mytable.name)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_replace_column2(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(self.table.c.name, "replacement"),
+ "REGEXP_REPLACE(mytable.myid, mytable.name, :myid_1)",
+ checkparams={"myid_1": "replacement"},
+ )
+
+ def test_regexp_replace_string(self):
+ self.assert_compile(
+ literal("string").regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(:param_1, :param_2, mytable.name)",
+ checkparams={"param_2": "pattern", "param_1": "string"},
+ )
+
+ def test_regexp_replace_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags="ig"
+ ),
+ "REGEXP_REPLACE(mytable.myid, :myid_1, :myid_3, :myid_2)",
+ checkparams={
+ "myid_1": "pattern",
+ "myid_3": "replacement",
+ "myid_2": "ig",
+ },
+ )
+
+ def test_regexp_replace_flags_col(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags=self.table.c.name
+ ),
+ "REGEXP_REPLACE(mytable.myid, :myid_1, :myid_2, mytable.name)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import Integer
+from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import null
from sqlalchemy import schema
"WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ "
"""to_tsquery('english', %(to_tsvector_2)s)""",
)
+
+
+class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "postgresql"
+
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid ~ %(myid_1)s",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_match_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid ~ mytable.name",
+ checkparams={},
+ )
+
+ def test_regexp_match_str(self):
+ self.assert_compile(
+ literal("string").regexp_match(self.table.c.name),
+ "%(param_1)s ~ mytable.name",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid ~ CONCAT('(?', %(myid_1)s, ')', %(myid_2)s)",
+ checkparams={"myid_2": "pattern", "myid_1": "ig"},
+ )
+
+ def test_regexp_match_flags_ignorecase(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="i"),
+ "mytable.myid ~* %(myid_1)s",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_match_flags_col(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags=self.table.c.name),
+ "mytable.myid ~ CONCAT('(?', mytable.name, ')', %(myid_1)s)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid !~ %(myid_1)s",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match_column(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid !~ mytable.name",
+ checkparams={},
+ )
+
+ def test_not_regexp_match_str(self):
+ self.assert_compile(
+ ~literal("string").regexp_match(self.table.c.name),
+ "%(param_1)s !~ mytable.name",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid !~ CONCAT('(?', %(myid_1)s, ')', %(myid_2)s)",
+ checkparams={"myid_2": "pattern", "myid_1": "ig"},
+ )
+
+ def test_not_regexp_match_flags_ignorecase(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="i"),
+ "mytable.myid !~* %(myid_1)s",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match_flags_col(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(
+ "pattern", flags=self.table.c.name
+ ),
+ "mytable.myid !~ CONCAT('(?', mytable.name, ')', %(myid_1)s)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_replace(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", "replacement"),
+ "REGEXP_REPLACE(mytable.myid, %(myid_1)s, %(myid_2)s)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
+
+ def test_regexp_replace_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(mytable.myid, %(myid_1)s, mytable.name)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_replace_column2(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(self.table.c.name, "replacement"),
+ "REGEXP_REPLACE(mytable.myid, mytable.name, %(myid_1)s)",
+ checkparams={"myid_1": "replacement"},
+ )
+
+ def test_regexp_replace_string(self):
+ self.assert_compile(
+ literal("string").regexp_replace("pattern", self.table.c.name),
+ "REGEXP_REPLACE(%(param_1)s, %(param_2)s, mytable.name)",
+ checkparams={"param_2": "pattern", "param_1": "string"},
+ )
+
+ def test_regexp_replace_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags="ig"
+ ),
+ "REGEXP_REPLACE(mytable.myid, %(myid_1)s, %(myid_3)s, %(myid_2)s)",
+ checkparams={
+ "myid_1": "pattern",
+ "myid_3": "replacement",
+ "myid_2": "ig",
+ },
+ )
+
+ def test_regexp_replace_flags_col(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags=self.table.c.name
+ ),
+ "REGEXP_REPLACE(mytable.myid, %(myid_1)s,"
+ " %(myid_2)s, mytable.name)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import inspect
+from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import pool
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import select
from sqlalchemy import sql
from sqlalchemy import Table
+from sqlalchemy import table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import tuple_
def test_round_trip_direct_type_affinity(self):
self._test_round_trip(self._type_affinity_fixture())
+
+
+class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "sqlite"
+
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid REGEXP ?",
+ checkpositional=("pattern",),
+ )
+
+ def test_regexp_match_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid REGEXP mytable.name",
+ checkparams={},
+ )
+
+ def test_regexp_match_str(self):
+ self.assert_compile(
+ literal("string").regexp_match(self.table.c.name),
+ "? REGEXP mytable.name",
+ checkpositional=("string",),
+ )
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid REGEXP ?",
+ checkpositional=("pattern",),
+ )
+
+ def test_not_regexp_match(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid NOT REGEXP ?",
+ checkpositional=("pattern",),
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid NOT REGEXP ?",
+ checkpositional=("pattern",),
+ )
+
+ def test_not_regexp_match_column(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid NOT REGEXP mytable.name",
+ checkparams={},
+ )
+
+ def test_not_regexp_match_str(self):
+ self.assert_compile(
+ ~literal("string").regexp_match(self.table.c.name),
+ "? NOT REGEXP mytable.name",
+ checkpositional=("string",),
+ )
+
+ def test_regexp_replace(self):
+ assert_raises_message(
+ exc.CompileError,
+ "sqlite dialect does not support regular expression replacements",
+ self.table.c.myid.regexp_replace("pattern", "rep").compile,
+ dialect=sqlite.dialect(),
+ )
def computed_columns_reflect_persisted(self):
return self.computed_columns + skip_if("oracle")
+ @property
+ def regexp_match(self):
+ return only_on(["postgresql", "mysql", "mariadb", "oracle", "sqlite"])
+
+ @property
+ def regexp_replace(self):
+ return only_on(["postgresql", "mysql>=8", "mariadb", "oracle"])
+
@property
def supports_distinct_on(self):
"""If a backend supports the DISTINCT ON in a select"""
)
+class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ assert_raises_message(
+ exc.CompileError,
+ "default dialect does not support regular expressions",
+ self.table.c.myid.regexp_match("pattern").compile,
+ dialect=default.DefaultDialect(),
+ )
+
+ def test_not_regexp_match(self):
+ assert_raises_message(
+ exc.CompileError,
+ "default dialect does not support regular expressions",
+ (~self.table.c.myid.regexp_match("pattern")).compile,
+ dialect=default.DefaultDialect(),
+ )
+
+ def test_regexp_replace(self):
+ assert_raises_message(
+ exc.CompileError,
+ "default dialect does not support regular expression replacements",
+ self.table.c.myid.regexp_replace("pattern", "rep").compile,
+ dialect=default.DefaultDialect(),
+ )
+
+
+class RegexpTestStrCompiler(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "default_enhanced"
+
+ def setUp(self):
+ self.table = table(
+ "mytable", column("myid", Integer), column("name", String)
+ )
+
+ def test_regexp_match(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid <regexp> :myid_1",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_match_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid <regexp> mytable.name",
+ checkparams={},
+ )
+
+ def test_regexp_match_str(self):
+ self.assert_compile(
+ literal("string").regexp_match(self.table.c.name),
+ ":param_1 <regexp> mytable.name",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_regexp_match_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid <regexp> :myid_1",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern"),
+ "mytable.myid <not regexp> :myid_1",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_not_regexp_match_column(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match(self.table.c.name),
+ "mytable.myid <not regexp> mytable.name",
+ checkparams={},
+ )
+
+ def test_not_regexp_match_str(self):
+ self.assert_compile(
+ ~literal("string").regexp_match(self.table.c.name),
+ ":param_1 <not regexp> mytable.name",
+ checkparams={"param_1": "string"},
+ )
+
+ def test_not_regexp_match_flags(self):
+ self.assert_compile(
+ ~self.table.c.myid.regexp_match("pattern", flags="ig"),
+ "mytable.myid <not regexp> :myid_1",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_replace(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", "replacement"),
+ "<regexp replace>(mytable.myid, :myid_1, :myid_2)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
+
+ def test_regexp_replace_column(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace("pattern", self.table.c.name),
+ "<regexp replace>(mytable.myid, :myid_1, mytable.name)",
+ checkparams={"myid_1": "pattern"},
+ )
+
+ def test_regexp_replace_column2(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(self.table.c.name, "replacement"),
+ "<regexp replace>(mytable.myid, mytable.name, :myid_1)",
+ checkparams={"myid_1": "replacement"},
+ )
+
+ def test_regexp_replace_string(self):
+ self.assert_compile(
+ literal("string").regexp_replace("pattern", self.table.c.name),
+ "<regexp replace>(:param_1, :param_2, mytable.name)",
+ checkparams={"param_2": "pattern", "param_1": "string"},
+ )
+
+ def test_regexp_replace_flags(self):
+ self.assert_compile(
+ self.table.c.myid.regexp_replace(
+ "pattern", "replacement", flags="ig"
+ ),
+ "<regexp replace>(mytable.myid, :myid_1, :myid_2)",
+ checkparams={"myid_1": "pattern", "myid_2": "replacement"},
+ )
+
+ def test_regexp_precedence_1(self):
+ self.assert_compile(
+ and_(
+ self.table.c.myid.match("foo"),
+ self.table.c.myid.regexp_match("xx"),
+ ),
+ "mytable.myid MATCH :myid_1 AND " "mytable.myid <regexp> :myid_2",
+ )
+ self.assert_compile(
+ and_(
+ self.table.c.myid.match("foo"),
+ ~self.table.c.myid.regexp_match("xx"),
+ ),
+ "mytable.myid MATCH :myid_1 AND "
+ "mytable.myid <not regexp> :myid_2",
+ )
+ self.assert_compile(
+ and_(
+ self.table.c.myid.match("foo"),
+ self.table.c.myid.regexp_replace("xx", "yy"),
+ ),
+ "mytable.myid MATCH :myid_1 AND "
+ "<regexp replace>(mytable.myid, :myid_2, :myid_3)",
+ )
+
+ def test_regexp_precedence_2(self):
+ self.assert_compile(
+ self.table.c.myid + self.table.c.myid.regexp_match("xx"),
+ "mytable.myid + (mytable.myid <regexp> :myid_1)",
+ )
+ self.assert_compile(
+ self.table.c.myid + ~self.table.c.myid.regexp_match("xx"),
+ "mytable.myid + (mytable.myid <not regexp> :myid_1)",
+ )
+ self.assert_compile(
+ self.table.c.myid + self.table.c.myid.regexp_replace("xx", "yy"),
+ "mytable.myid + ("
+ "<regexp replace>(mytable.myid, :myid_1, :myid_2))",
+ )
+
+
class ComposedLikeOperatorsTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = "default"