_registry = util.defaultdict(dict)
+_case_sensitive_registry = util.defaultdict(
+ lambda: util.defaultdict(dict)
+)
+_CASE_SENSITIVE = util.symbol(
+ name="case_sensitive_function",
+ doc="Symbol to mark the functions that are switched into case-sensitive "
+ "mode.")
def register_function(identifier, fn, package="_default"):
"""
reg = _registry[package]
- reg[identifier] = fn
+ case_sensitive_reg = _case_sensitive_registry[package]
+ raw_identifier = identifier
+ identifier = identifier.lower()
+
+ # Check if a function with the same lowercase identifier is registered.
+ if identifier in reg and reg[identifier] is not _CASE_SENSITIVE:
+ if raw_identifier in case_sensitive_reg[identifier]:
+ util.warn(
+ "The GenericFunction '{}' is already registered and "
+ "is going to be overriden.".format(identifier))
+ reg[identifier] = fn
+ else:
+ # If a function with the same lowercase identifier is registered,
+ # then these 2 functions are considered as case-sensitive.
+ # Note: This case should raise an error in a later release.
+ util.warn_deprecated(
+ "GenericFunction '{}' is already registered with "
+ "different letter case, so the previously registered function "
+ "'{}' is switched into case-sensitive mode. "
+ "GenericFunction objects will be fully case-insensitive in a "
+ "future release.".format(
+ raw_identifier,
+ list(case_sensitive_reg[identifier].keys())[0],
+ ))
+ reg[identifier] = _CASE_SENSITIVE
+
+ # Check if a function with different letter case identifier is registered.
+ elif identifier in case_sensitive_reg:
+ # Note: This case will be removed in a later release.
+ if (
+ raw_identifier not in case_sensitive_reg[identifier]
+ ):
+ util.warn_deprecated(
+ "GenericFunction(s) '{}' are already registered with "
+ "different letter cases and might interact with '{}'. "
+ "GenericFunction objects will be fully case-insensitive in a "
+ "future release.".format(
+ sorted(case_sensitive_reg[identifier].keys()),
+ raw_identifier))
+
+ else:
+ util.warn(
+ "The GenericFunction '{}' is already registered and "
+ "is going to be overriden.".format(raw_identifier))
+
+ # Register by default
+ else:
+ reg[identifier] = fn
+
+ # Always register in case-sensitive registry
+ case_sensitive_reg[identifier][raw_identifier] = fn
class FunctionElement(Executable, ColumnElement, FromClause):
package = None
if package is not None:
- func = _registry[package].get(fname)
+ func = _registry[package].get(fname.lower())
+ if func is _CASE_SENSITIVE:
+ case_sensitive_reg = _case_sensitive_registry[package]
+ func = case_sensitive_reg.get(fname.lower()).get(fname)
+
if func is not None:
return func(*c, **o)
#! coding: utf-8
+from copy import deepcopy
+
+import pytest
+
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import column
from sqlalchemy import create_engine
+from sqlalchemy import DateTime
from sqlalchemy import exc
from sqlalchemy import ForeignKey
+from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.schema import DDL
+from sqlalchemy.sql import functions
from sqlalchemy.sql import util as sql_util
+from sqlalchemy.sql.functions import GenericFunction
+from sqlalchemy.sql.sqltypes import NullType
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import in_
from sqlalchemy.testing import mock
+from sqlalchemy.testing import not_in_
class DeprecationWarningsTest(fixtures.TestBase):
)
+class CaseSensitiveFunctionDeprecationsTest(fixtures.TestBase):
+
+ def setup(self):
+ self._registry = deepcopy(functions._registry)
+ self._case_sensitive_registry = deepcopy(
+ functions._case_sensitive_registry)
+ functions._registry.clear()
+ functions._case_sensitive_registry.clear()
+
+ def teardown(self):
+ functions._registry = self._registry
+ functions._case_sensitive_registry = self._case_sensitive_registry
+
+ def test_case_sensitive(self):
+ reg = functions._registry['_default']
+ cs_reg = functions._case_sensitive_registry['_default']
+
+ class MYFUNC(GenericFunction):
+ type = DateTime
+
+ assert isinstance(func.MYFUNC().type, DateTime)
+ assert isinstance(func.MyFunc().type, DateTime)
+ assert isinstance(func.mYfUnC().type, DateTime)
+ assert isinstance(func.myfunc().type, DateTime)
+
+ in_("myfunc", reg)
+ not_in_("MYFUNC", reg)
+ not_in_("MyFunc", reg)
+ in_("myfunc", cs_reg)
+ eq_(set(cs_reg['myfunc'].keys()), set(['MYFUNC']))
+
+ with testing.expect_deprecated(
+ "GenericFunction 'MyFunc' is already registered with"
+ " different letter case, so the previously registered function "
+ "'MYFUNC' is switched into case-sensitive mode. "
+ "GenericFunction objects will be fully case-insensitive in a "
+ "future release.",
+ regex=False
+ ):
+ class MyFunc(GenericFunction):
+ type = Integer
+
+ assert isinstance(func.MYFUNC().type, DateTime)
+ assert isinstance(func.MyFunc().type, Integer)
+ with pytest.raises(AssertionError):
+ assert isinstance(func.mYfUnC().type, Integer)
+ with pytest.raises(AssertionError):
+ assert isinstance(func.myfunc().type, Integer)
+
+ eq_(reg["myfunc"], functions._CASE_SENSITIVE)
+ not_in_("MYFUNC", reg)
+ not_in_("MyFunc", reg)
+ in_("myfunc", cs_reg)
+ eq_(set(cs_reg['myfunc'].keys()), set(['MYFUNC', 'MyFunc']))
+
+ def test_replace_function_case_sensitive(self):
+ reg = functions._registry['_default']
+ cs_reg = functions._case_sensitive_registry['_default']
+
+ class replaceable_func(GenericFunction):
+ type = Integer
+ identifier = 'REPLACEABLE_FUNC'
+
+ assert isinstance(func.REPLACEABLE_FUNC().type, Integer)
+ assert isinstance(func.Replaceable_Func().type, Integer)
+ assert isinstance(func.RePlAcEaBlE_fUnC().type, Integer)
+ assert isinstance(func.replaceable_func().type, Integer)
+
+ in_("replaceable_func", reg)
+ not_in_("REPLACEABLE_FUNC", reg)
+ not_in_("Replaceable_Func", reg)
+ in_("replaceable_func", cs_reg)
+ eq_(set(cs_reg['replaceable_func'].keys()), set(['REPLACEABLE_FUNC']))
+
+ with testing.expect_deprecated(
+ "GenericFunction 'Replaceable_Func' is already registered with"
+ " different letter case, so the previously registered function "
+ "'REPLACEABLE_FUNC' is switched into case-sensitive mode. "
+ "GenericFunction objects will be fully case-insensitive in a "
+ "future release.",
+ regex=False
+ ):
+ class Replaceable_Func(GenericFunction):
+ type = DateTime
+ identifier = 'Replaceable_Func'
+
+ assert isinstance(func.REPLACEABLE_FUNC().type, Integer)
+ assert isinstance(func.Replaceable_Func().type, DateTime)
+ assert isinstance(func.RePlAcEaBlE_fUnC().type, NullType)
+ assert isinstance(func.replaceable_func().type, NullType)
+
+ eq_(reg["replaceable_func"], functions._CASE_SENSITIVE)
+ not_in_("REPLACEABLE_FUNC", reg)
+ not_in_("Replaceable_Func", reg)
+ in_("replaceable_func", cs_reg)
+ eq_(set(cs_reg['replaceable_func'].keys()),
+ set(['REPLACEABLE_FUNC', 'Replaceable_Func']))
+
+ with testing.expect_warnings(
+ "The GenericFunction 'REPLACEABLE_FUNC' is already registered and "
+ "is going to be overriden.",
+ regex=False
+ ):
+ class replaceable_func_override(GenericFunction):
+ type = DateTime
+ identifier = 'REPLACEABLE_FUNC'
+
+ with testing.expect_deprecated(
+ "GenericFunction(s) '['REPLACEABLE_FUNC', 'Replaceable_Func']' "
+ "are already registered with different letter cases and might "
+ "interact with 'replaceable_func'. GenericFunction objects will "
+ "be fully case-insensitive in a future release.",
+ regex=False
+ ):
+ class replaceable_func_lowercase(GenericFunction):
+ type = String
+ identifier = 'replaceable_func'
+
+ with testing.expect_warnings(
+ "The GenericFunction 'Replaceable_Func' is already registered and "
+ "is going to be overriden.",
+ regex=False
+ ):
+ class Replaceable_Func_override(GenericFunction):
+ type = Integer
+ identifier = 'Replaceable_Func'
+
+ assert isinstance(func.REPLACEABLE_FUNC().type, DateTime)
+ assert isinstance(func.Replaceable_Func().type, Integer)
+ assert isinstance(func.RePlAcEaBlE_fUnC().type, NullType)
+ assert isinstance(func.replaceable_func().type, String)
+
+ eq_(reg["replaceable_func"], functions._CASE_SENSITIVE)
+ not_in_("REPLACEABLE_FUNC", reg)
+ not_in_("Replaceable_Func", reg)
+ in_("replaceable_func", cs_reg)
+ eq_(set(cs_reg['replaceable_func'].keys()),
+ set(['REPLACEABLE_FUNC', 'Replaceable_Func', 'replaceable_func']))
+
+
class DDLListenerDeprecationsTest(fixtures.TestBase):
def setup(self):
self.bind = self.engine = engines.mock_engine()
+from copy import deepcopy
import datetime
import decimal
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing.assertions import expect_warnings
from sqlalchemy.testing.engines import all_dialects
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def tear_down(self):
- functions._registry.clear()
+ def setup(self):
+ self._registry = deepcopy(functions._registry)
+ self._case_sensitive_registry = deepcopy(
+ functions._case_sensitive_registry)
+
+ def teardown(self):
+ functions._registry = self._registry
+ functions._case_sensitive_registry = self._case_sensitive_registry
def test_compile(self):
for dialect in all_dialects(exclude=("sybase",)):
dialect=dialect,
)
+ functions._registry['_default'].pop('fake_func')
+ functions._case_sensitive_registry['_default'].pop('fake_func')
+
def test_use_labels(self):
self.assert_compile(
select([func.foo()], use_labels=True), "SELECT foo() AS foo_1"
def test_uppercase(self):
# for now, we need to keep case insensitivity
- self.assert_compile(func.NOW(), "NOW()")
+ self.assert_compile(func.UNREGISTERED_FN(), "UNREGISTERED_FN()")
def test_uppercase_packages(self):
# for now, we need to keep case insensitivity
assert isinstance(func.myfunc().type, DateTime)
+ def test_case_sensitive(self):
+ class MYFUNC(GenericFunction):
+ type = DateTime
+
+ assert isinstance(func.MYFUNC().type, DateTime)
+ assert isinstance(func.MyFunc().type, DateTime)
+ assert isinstance(func.mYfUnC().type, DateTime)
+ assert isinstance(func.myfunc().type, DateTime)
+
+ def test_replace_function(self):
+ class replaceable_func(GenericFunction):
+ type = Integer
+ identifier = 'replaceable_func'
+
+ assert isinstance(func.Replaceable_Func().type, Integer)
+ assert isinstance(func.RePlAcEaBlE_fUnC().type, Integer)
+ assert isinstance(func.replaceable_func().type, Integer)
+
+ with expect_warnings(
+ "The GenericFunction 'replaceable_func' is already registered and "
+ "is going to be overriden.",
+ regex=False
+ ):
+ class replaceable_func_override(GenericFunction):
+ type = DateTime
+ identifier = 'replaceable_func'
+
+ assert isinstance(func.Replaceable_Func().type, DateTime)
+ assert isinstance(func.RePlAcEaBlE_fUnC().type, DateTime)
+ assert isinstance(func.replaceable_func().type, DateTime)
+
def test_custom_w_custom_name(self):
class myfunc(GenericFunction):
name = "notmyfunc"
type = Integer
identifier = "buf3"
+ class GeoBufferFour(GenericFunction):
+ type = Integer
+ name = "BufferFour"
+ identifier = "Buf4"
+
self.assert_compile(func.geo.buf1(), "BufferOne()")
self.assert_compile(func.buf2(), "BufferTwo()")
self.assert_compile(func.buf3(), "BufferThree()")
+ self.assert_compile(func.Buf4(), "BufferFour()")
+ self.assert_compile(func.BuF4(), "BufferFour()")
+ self.assert_compile(func.bUf4(), "BufferFour()")
+ self.assert_compile(func.bUf4_(), "BufferFour()")
+ self.assert_compile(func.buf4(), "BufferFour()")
def test_custom_args(self):
class myfunc(GenericFunction):