series as well. For changes that are specific to 1.0 with an emphasis
on compatibility concerns, see :doc:`/changelog/migration_10`.
+ .. change::
+ :tags: feature, engine
+ :tickets: 3178
+
+ A new style of warning can be emitted which will "filter" up to
+ N occurrences of a parameterized string. This allows parameterized
+ warnings that can refer to their arguments to be delivered a fixed
+ number of times until allowing Python warning filters to squelch them,
+ and prevents memory from growing unbounded within Python's
+ warning registries.
+
+ .. seealso::
+
+ :ref:`feature_3178`
+
.. change::
:tags: feature, orm
:ticket:`3176`
+.. _feature_3178:
+
+New systems to safely emit parameterized warnings
+-------------------------------------------------
+
+For a long time, there has been a restriction that warning messages could not
+refer to data elements, such that a particular function might emit an
+infinite number of unique warnings. The key place this occurs is in the
+``Unicode type received non-unicode bind param value`` warning. Placing
+the data value in this message would mean that the Python ``__warningregistry__``
+for that module, or in some cases the Python-global ``warnings.onceregistry``,
+would grow unbounded, as in most warning scenarios, one of these two collections
+is populated with every distinct warning message.
+
+The change here is that by using a special ``string`` type that purposely
+changes how the string is hashed, we can control that a large number of
+parameterized messages are hashed only on a small set of possible hash
+values, such that a warning such as ``Unicode type received non-unicode
+bind param value`` can be tailored to be emitted only a specific number
+of times; beyond that, the Python warnings registry will begin recording
+them as duplicates.
+
+To illustrate, the following test script will show only ten warnings being
+emitted for ten of the parameter sets, out of a total of 1000:
+
+ from sqlalchemy import create_engine, Unicode, select, cast
+ import random
+ import warnings
+
+ e = create_engine("sqlite://")
+
+ # Use the "once" filter (which is also the default for Python
+ # warnings). Exactly ten of these warnings will
+ # be emitted; beyond that, the Python warnings registry will accumulate
+ # new values as dupes of one of the ten existing.
+ warnings.filterwarnings("once")
+
+ for i in range(1000):
+ e.execute(select([cast(
+ ('foo_%d' % random.randint(0, 1000000)).encode('ascii'), Unicode)]))
+
+The format of the warning here is::
+
+ /path/lib/sqlalchemy/sql/sqltypes.py:186: SAWarning: Unicode type received
+ non-unicode bind param value 'foo_4852'. (this warning may be
+ suppressed after 10 occurrences)
+
+
+:ticket:`3178`
.. _feature_2963:
if (_none_set.issubset(identity_key) and
not mapper.allow_partial_pks) or \
_none_set.issuperset(identity_key):
- util.warn("Instance %s to be refreshed doesn't "
- "contain a full primary key - can't be refreshed "
- "(and shouldn't be expired, either)."
- % state_str(state))
+ util.warn_limited(
+ "Instance %s to be refreshed doesn't "
+ "contain a full primary key - can't be refreshed "
+ "(and shouldn't be expired, either).",
+ state_str(state))
return
result = load_on_ident(
if polymorphic_key in dict_ and \
dict_[polymorphic_key] not in \
mapper._acceptable_polymorphic_identities:
- util.warn(
+ util.warn_limited(
"Flushing object %s with "
"incompatible polymorphic identity %r; the "
- "object may not refresh and/or load correctly" % (
- state_str(state),
- dict_[polymorphic_key]
- )
+ "object may not refresh and/or load correctly",
+ state_str(state),
+ dict_[polymorphic_key]
)
self._set_polymorphic_identity = _set_polymorphic_identity
elif needs_version_id:
util.warn("Dialect %s does not support updated rowcount "
"- versioning cannot be verified." %
- c.dialect.dialect_description,
- stacklevel=12)
+ c.dialect.dialect_description)
def _emit_insert_statements(base_mapper, uowtransaction,
else:
if getattr(self.type, '_warn_on_bytestring', False):
if isinstance(self.default, util.binary_type):
- util.warn("Unicode column received non-unicode "
- "default value.")
+ util.warn(
+ "Unicode column '%s' has non-unicode "
+ "default value %r specified." % (
+ self.key,
+ self.default
+ ))
args.append(ColumnDefault(self.default))
if self.server_default is not None:
if self._warn_on_bytestring:
def process(value):
if isinstance(value, util.binary_type):
- util.warn("Unicode type received non-unicode"
- "bind param value.")
+ util.warn_limited(
+ "Unicode type received non-unicode "
+ "bind param value %r.",
+ util.ellipses_string(value))
return value
return process
else:
if isinstance(value, util.text_type):
return encoder(value, self.unicode_error)[0]
elif warn_on_bytestring and value is not None:
- util.warn("Unicode type received non-unicode bind "
- "param value")
+ util.warn_limited(
+ "Unicode type received non-unicode bind "
+ "param value %r.",
+ util.ellipses_string(value))
return value
return process
else:
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from .warnings import testing_warn, assert_warnings, resetwarnings
+from .warnings import assert_warnings
from . import config
from . import util as testutil
from sqlalchemy import pool, orm, util
-from sqlalchemy.engine import default, create_engine, url
-from sqlalchemy import exc as sa_exc
+from sqlalchemy.engine import default, url
from sqlalchemy.util import decorator
from sqlalchemy import types as sqltypes, schema
import warnings
import re
-from .warnings import resetwarnings
from .exclusions import db_spec, _is_excluded
from . import assertsql
from . import config
-import itertools
from .util import fail
import contextlib
+from . import mock
-def emits_warning(*messages):
- """Mark a test as emitting a warning.
+def expect_warnings(*messages):
+ """Context manager which expects one or more warnings.
+
+ With no arguments, squelches all SAWarnings emitted via
+ sqlalchemy.util.warn and sqlalchemy.util.warn_limited. Otherwise
+ pass string expressions that will match selected warnings via regex;
+ all non-matching warnings are sent through.
+
+ Note that the test suite sets SAWarning warnings to raise exceptions.
+
+ """
+ return _expect_warnings(
+ "sqlalchemy.util.deprecations.warnings.warn", messages)
+
+
+@contextlib.contextmanager
+def expect_warnings_on(db, *messages):
+ """Context manager which expects one or more warnings on specific
+ dialects.
- With no arguments, squelches all SAWarning failures. Or pass one or more
- strings; these will be matched to the root of the warning description by
- warnings.filterwarnings().
"""
- # TODO: it would be nice to assert that a named warning was
- # emitted. should work with some monkeypatching of warnings,
- # and may work on non-CPython if they keep to the spirit of
- # warnings.showwarning's docstring.
- # - update: jython looks ok, it uses cpython's module
+ spec = db_spec(db)
+
+ if isinstance(db, util.string_types) and not spec(config._current):
+ yield
+ elif not _is_excluded(*db):
+ yield
+ else:
+ with expect_warnings(*messages):
+ yield
+
+
+def emits_warning(*messages):
+ """Decorator form of expect_warnings()."""
@decorator
def decorate(fn, *args, **kw):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SAWarning))
- else:
- filters.extend(dict(action='ignore',
- message=message,
- category=sa_exc.SAWarning)
- for message in messages)
- for f in filters:
- warnings.filterwarnings(**f)
- try:
+ with expect_warnings(*messages):
return fn(*args, **kw)
- finally:
- resetwarnings()
+
return decorate
-def emits_warning_on(db, *warnings):
+def expect_deprecated(*messages):
+ return _expect_warnings(
+ "sqlalchemy.util.deprecations.warnings.warn", messages)
+
+
+def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
"""
- spec = db_spec(db)
-
@decorator
def decorate(fn, *args, **kw):
- if isinstance(db, util.string_types):
- if not spec(config._current):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
- else:
- if not _is_excluded(*db):
- return fn(*args, **kw)
- else:
- wrapped = emits_warning(*warnings)(fn)
- return wrapped(*args, **kw)
+ with expect_warnings_on(db, *messages):
+ return fn(*args, **kw)
+
return decorate
@contextlib.contextmanager
-def expect_deprecated(*messages):
- # todo: should probably be strict about this, too
- filters = [dict(action='ignore',
- category=sa_exc.SAPendingDeprecationWarning)]
- if not messages:
- filters.append(dict(action='ignore',
- category=sa_exc.SADeprecationWarning))
- else:
- filters.extend(
- [dict(action='ignore',
- message=message,
- category=sa_exc.SADeprecationWarning)
- for message in
- [(m.startswith('//') and
- ('Call to deprecated function ' + m[2:]) or m)
- for m in messages]])
-
- for f in filters:
- warnings.filterwarnings(**f)
- try:
+def _expect_warnings(to_patch, messages):
+
+ filters = [re.compile(msg, re.I) for msg in messages]
+
+ real_warn = warnings.warn
+
+ def our_warn(msg, exception, *arg, **kw):
+ if not filters:
+ return
+
+ for filter_ in filters:
+ if filter_.match(msg):
+ break
+ else:
+ real_warn(msg, exception, *arg, **kw)
+
+ with mock.patch(to_patch, our_warn):
yield
- finally:
- resetwarnings()
def global_cleanup_assertions():
from sqlalchemy.testing import fixtures, engines, exclusions, \
assertions, warnings, profiling, config
from sqlalchemy import util
-
+ warnings.setup_filters()
def _log(opt_str, value, parser):
global logging
id_ = "%s.%s.%s" % (test_module_name, name, test_name)
- warnings.resetwarnings()
profiling._current_test = id_
def after_test(test):
engines.testing_reaper._after_test_ctx()
- warnings.resetwarnings()
def _possible_configs_for_cls(cls, reasons=None):
import warnings
from .. import exc as sa_exc
-from .. import util
import re
-def testing_warn(msg, stacklevel=3):
- """Replaces sqlalchemy.util.warn during tests."""
-
- filename = "sqlalchemy.testing.warnings"
- lineno = 1
- if isinstance(msg, util.string_types):
- warnings.warn_explicit(msg, sa_exc.SAWarning, filename, lineno)
- else:
- warnings.warn_explicit(msg, filename, lineno)
-
-
-def resetwarnings():
- """Reset warning behavior to testing defaults."""
-
- util.warn = util.langhelpers.warn = testing_warn
+def setup_filters():
+ """Set global warning behavior for the test suite."""
warnings.filterwarnings('ignore',
category=sa_exc.SAPendingDeprecationWarning)
warnings.filterwarnings('error', category=sa_exc.SAWarning)
-def assert_warnings(fn, warnings, regex=False):
+def assert_warnings(fn, warning_msgs, regex=False):
"""Assert that each of the given warnings are emitted by fn."""
- from .assertions import eq_, emits_warning
+ from .assertions import eq_
- canary = []
- orig_warn = util.warn
+ with warnings.catch_warnings(record=True) as log:
+ # ensure that nothing is going into __warningregistry__
+ warnings.filterwarnings("always")
- def capture_warnings(*args, **kw):
- orig_warn(*args, **kw)
- popwarn = warnings.pop(0)
- canary.append(popwarn)
+ result = fn()
+ for warning in log:
+ popwarn = warning_msgs.pop(0)
if regex:
- assert re.match(popwarn, args[0])
+ assert re.match(popwarn, str(warning.message))
else:
- eq_(args[0], popwarn)
- util.warn = util.langhelpers.warn = capture_warnings
-
- result = emits_warning()(fn)()
- assert canary, "No warning was emitted"
+ eq_(popwarn, str(warning.message))
return result
classproperty, set_creation_order, warn_exception, warn, NoneType,\
constructor_copy, methods_equivalent, chop_traceback, asint,\
generic_repr, counter, PluginLoader, hybridmethod, safe_reraise,\
- get_callable_argspec, only_once, attrsetter
+ get_callable_argspec, only_once, attrsetter, ellipses_string, \
+ warn_limited
from .deprecations import warn_deprecated, warn_pending_deprecation, \
deprecated, pending_deprecation, inject_docstring_text
@decorator
def warned(fn, *args, **kwargs):
- warnings.warn(wtype(message), stacklevel=3)
+ warnings.warn(message, wtype, stacklevel=3)
return fn(*args, **kwargs)
doc = func.__doc__ is not None and func.__doc__ or ''
warn("%s('%s') ignored" % sys.exc_info()[0:2])
-def warn(msg, stacklevel=3):
+def ellipses_string(value, len_=25):
+ if len(value) > len_:
+ return "%s..." % value[0:len_]
+ else:
+ return value
+
+
+class _hash_limit_string(compat.text_type):
+ """A string subclass that can only be hashed on a maximum amount
+ of unique values.
+
+ This is used for warnings so that we can send out parameterized warnings
+ without the __warningregistry__ of the module, or the non-overridable
+ "once" registry within warnings.py, overloading memory,
+
+
+ """
+ def __new__(cls, value, args, num):
+ interpolated = value % args + \
+ (" (this warning may be suppressed after %d occurrences)" % num)
+ self = super(_hash_limit_string, cls).__new__(cls, interpolated)
+ self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
+ return self
+
+ def __hash__(self):
+ return self._hash
+
+ def __eq__(self, other):
+ return hash(self) == hash(other)
+
+
+def warn(msg):
"""Issue a warning.
If msg is a string, :class:`.exc.SAWarning` is used as
the category.
- .. note::
+ """
+ warnings.warn(msg, exc.SAWarning, stacklevel=2)
+
- This function is swapped out when the test suite
- runs, with a compatible version that uses
- warnings.warn_explicit, so that the warnings registry can
- be controlled.
+def warn_limited(msg, *args):
+ """Issue a warning with a paramterized string, limiting the number
+ of registrations.
"""
- if isinstance(msg, compat.string_types):
- warnings.warn(msg, exc.SAWarning, stacklevel=stacklevel)
- else:
- warnings.warn(msg, stacklevel=stacklevel)
+ if args:
+ msg = _hash_limit_string(msg, args, 10)
+ warnings.warn(msg, exc.SAWarning, stacklevel=2)
def only_once(fn):
import decimal
import gc
from sqlalchemy.testing import fixtures
+from sqlalchemy import util
import weakref
+import itertools
class A(fixtures.ComparableEntity):
finally:
metadata.drop_all()
+ def test_warnings_util(self):
+ counter = itertools.count()
+ import warnings
+ warnings.filterwarnings("ignore", "memusage warning.*")
+
+ @profile_memory()
+ def go():
+ util.warn_limited(
+ "memusage warning, param1: %s, param2: %s",
+ next(counter), next(counter))
+ go()
+
def test_mapper_reset(self):
metadata = MetaData(self.engine)
def test_extra_dirty_state_post_flush_state(self):
s, a1, a2 = self._test_extra_dirty_state()
canary = []
+
@event.listens_for(s, "after_flush_postexec")
def e(sess, ctx):
canary.append(bool(sess.identity_map._modified))
default = b('foo')
assert_raises_message(
sa.exc.SAWarning,
- "Unicode column received non-unicode default value.",
+ "Unicode column 'foobar' has non-unicode "
+ "default value 'foo' specified.",
Column,
- Unicode(32),
+ "foobar", Unicode(32),
default=default
)