--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 8098
+
+ Fixed multiple observed race conditions related to :func:`.lambda_stmt`,
+ including an initial "dogpile" issue when a new Python code object is
+ initially analyzed among multiple simultaneous threads which created both a
+ performance issue as well as some internal corruption of state.
+ Additionally repaired observed race condition which could occur when
+ "cloning" an expression construct that is also in the process of being
+ compiled or otherwise accessed in a different thread due to memoized
+ attributes altering the ``__dict__`` while iterated, for Python versions
+ prior to 3.10; in particular the lambda SQL construct is sensitive to this
+ as it holds onto a single statement object persistently. The iteration has
+ been refined to use ``dict.copy()`` with or without an additional iteration
+ instead.
"""
skip = self._memoized_keys
c = self.__class__.__new__(self.__class__)
- c.__dict__ = {k: v for k, v in self.__dict__.items() if k not in skip}
+
+ if skip:
+ # ensure this iteration remains atomic
+ c.__dict__ = {
+ k: v for k, v in self.__dict__.copy().items() if k not in skip
+ }
+ else:
+ c.__dict__ = self.__dict__.copy()
# this is a marker that helps to "equate" clauses to each other
# when a Select returns its list of FROM clauses. the cloning
import itertools
import operator
import sys
+import threading
import types
import weakref
if rec is None:
if cache_key is not traversals.NO_CACHE:
- rec = AnalyzedFunction(
- tracker, self, apply_propagate_attrs, fn
- )
- rec.closure_bindparams = bindparams
- lambda_cache[tracker_key + cache_key] = rec
+
+ with AnalyzedCode._generation_mutex:
+ key = tracker_key + cache_key
+ if key not in lambda_cache:
+ rec = AnalyzedFunction(
+ tracker, self, apply_propagate_attrs, fn
+ )
+ rec.closure_bindparams = bindparams
+ lambda_cache[key] = rec
+ else:
+ rec = lambda_cache[key]
else:
rec = NonAnalyzedFunction(self._invoke_user_fn(fn))
)
_fns = weakref.WeakKeyDictionary()
+ _generation_mutex = threading.RLock()
+
@classmethod
def get(cls, fn, lambda_element, lambda_kw, **kw):
try:
return cls._fns[fn.__code__]
except KeyError:
pass
- cls._fns[fn.__code__] = analyzed = AnalyzedCode(
- fn, lambda_element, lambda_kw, **kw
- )
- return analyzed
+
+ with cls._generation_mutex:
+ # check for other thread already created object
+ if fn.__code__ in cls._fns:
+ return cls._fns[fn.__code__]
+
+ cls._fns[fn.__code__] = analyzed = AnalyzedCode(
+ fn, lambda_element, lambda_kw, **kw
+ )
+ return analyzed
def __init__(self, fn, lambda_element, opts):
if inspect.ismethod(fn):