From: Lysandros Nikolaou Date: Fri, 26 Sep 2025 14:16:41 +0000 (-0400) Subject: Add explicit multi-threaded tests and support free-threaded build X-Git-Tag: rel_2_0_44~14^2 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=a0b02abfda75fd3f85cd4d402eed7f3abf58d5dd;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add explicit multi-threaded tests and support free-threaded build Implemented initial support for free-threaded Python by adding new tests and reworking the test harness and GitHub Actions to include Python 3.13t and Python 3.14t in test runs. Two concurrency issues have been identified and fixed: the first involves initialization of the ``.c`` collection on a ``FromClause``, a continuation of :ticket:`12302`, where an optional mutex under free-threading is added; the second involves synchronization of the pool "first_connect" event, which first received thread synchronization in :ticket:`2964`, however under free-threading the creation of the mutex itself runs under the same free-threading mutex. Initial pull request and test suite courtesy Lysandros Nikolaou. py313t: yes py314t: yes Fixes: #12881 Closes: #12882 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/12882 Pull-request-sha: 53d65d96b979b1afbdb49e6394979cfd598c9a34 Co-authored-by: Mike Bayer Change-Id: I2e4f2e9ac974ab6382cb0520cc446b396d9680a6 (cherry picked from commit 456727df50c7dc29ddc31c6f67c1be21771386fa) --- diff --git a/.github/workflows/run-on-pr.yaml b/.github/workflows/run-on-pr.yaml index 889da8499f..3e2f1b39f6 100644 --- a/.github/workflows/run-on-pr.yaml +++ b/.github/workflows/run-on-pr.yaml @@ -27,6 +27,7 @@ jobs: python-version: - "3.13" build-type: + - "cext-greenlet" - "cext" - "nocext" architecture: @@ -52,7 +53,7 @@ jobs: pip list - name: Run tests - run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }} + run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }} run-tox: name: ${{ matrix.tox-env }}-${{ matrix.python-version }} diff --git a/.github/workflows/run-test.yaml b/.github/workflows/run-test.yaml index fef34d4908..15a7f9a510 100644 --- a/.github/workflows/run-test.yaml +++ b/.github/workflows/run-test.yaml @@ -39,8 +39,14 @@ jobs: - "3.11" - "3.12" - "3.13" + - "3.14.0-alpha - 3.14" - "pypy-3.10" build-type: + # builds greenlet, runs asyncio tests. includes aiosqlite driver + - "cext-greenlet" + + # these do not install greenlet at all and skip asyncio tests. + # does not include aiosqlite driver - "cext" - "nocext" architecture: @@ -52,14 +58,35 @@ jobs: # autocommit tests fail on the ci for some reason - python-version: "pypy-3.10" pytest-args: "-k 'not test_autocommit_on and not test_turn_autocommit_off_via_default_iso_level and not test_autocommit_isolation_level'" - - os: "ubuntu-22.04" - pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite" - - os: "ubuntu-22.04-arm" - pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite" - exclude: - # linux does not have x86 / arm64 python + # cext-greenlet only runs on ubuntu x64 and arm64 + - build-type: "cext-greenlet" + os: "windows-latest" + - build-type: "cext-greenlet" + os: "windows-11-arm" + - build-type: "cext-greenlet" + os: "macos-latest" + - build-type: "cext-greenlet" + os: "macos-13" + + # the threaded pythons are not stable under greenlet. Even + # though we can run individual tests, when you run the whole suite + # with xdist and the greenlet wrapper, the workers keep crashing + # and getting replaced + - build-type: "cext-greenlet" + python-version: "3.13t" + + - build-type: "cext-greenlet" + python-version: "3.14t-dev" + + # skip py 3.14 on x64, because greenlet builds are not on + # pypi and these dont compile yet + - architecture: x64 + python-version: + - "3.14.0-alpha - 3.14" + - "3.14t-dev" + # linux do not have x86 / arm64 python - os: "ubuntu-22.04" architecture: x86 - os: "ubuntu-22.04" @@ -113,12 +140,6 @@ jobs: python-version: ${{ matrix.python-version }} architecture: ${{ matrix.architecture }} - - name: Remove greenlet - if: ${{ matrix.no-greenlet == 'true' }} - shell: pwsh - run: | - (cat setup.cfg) | %{$_ -replace "^\s*greenlet.+",""} | set-content setup.cfg - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -126,7 +147,10 @@ jobs: pip list - name: Run tests - run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }} + run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }} + env: + # under free threading, make sure to disable GIL + PYTHON_GIL: ${{ contains(matrix.python-version, 't') && '0' || '' }} continue-on-error: ${{ matrix.python-version == 'pypy-3.10' }} run-tox: @@ -138,8 +162,6 @@ jobs: os: - "ubuntu-22.04" python-version: - - "3.10" - - "3.11" - "3.12" - "3.13" tox-env: diff --git a/doc/build/changelog/unreleased_20/12881.rst b/doc/build/changelog/unreleased_20/12881.rst new file mode 100644 index 0000000000..5b9a5fe908 --- /dev/null +++ b/doc/build/changelog/unreleased_20/12881.rst @@ -0,0 +1,14 @@ +.. change:: + :tags: bug, engine + :tickets: 12881 + + Implemented initial support for free-threaded Python by adding new tests + and reworking the test harness and GitHub Actions to include Python 3.13t + and Python 3.14t in test runs. Two concurrency issues have been identified + and fixed: the first involves initialization of the ``.c`` collection on a + ``FromClause``, a continuation of :ticket:`12302`, where an optional mutex + under free-threading is added; the second involves synchronization of the + pool "first_connect" event, which first received thread synchronization in + :ticket:`2964`, however under free-threading the creation of the mutex + itself runs under the same free-threading mutex. Initial pull request and + test suite courtesy Lysandros Nikolaou. diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index ec5d5822f1..ecea1045c9 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -343,11 +343,20 @@ class _EmptyListener(_InstanceLevelDispatch[_ET]): obj = cast("_Dispatch[_ET]", obj) assert obj._instance_cls is not None - result = _ListenerCollection(self.parent, obj._instance_cls) - if getattr(obj, self.name) is self: - setattr(obj, self.name, result) - else: - assert isinstance(getattr(obj, self.name), _JoinedListener) + existing = getattr(obj, self.name) + + with util.mini_gil: + if existing is self or isinstance(existing, _JoinedListener): + result = _ListenerCollection(self.parent, obj._instance_cls) + else: + # this codepath is an extremely rare race condition + # that has been observed in test_pool.py->test_timeout_race + # with freethreaded. + assert isinstance(existing, _ListenerCollection) + return existing + + if existing is self: + setattr(obj, self.name, result) return result def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn: @@ -409,7 +418,7 @@ class _CompoundListener(_InstanceLevelDispatch[_ET]): "_is_asyncio", ) - _exec_once_mutex: _MutexProtocol + _exec_once_mutex: Optional[_MutexProtocol] parent_listeners: Collection[_ListenerFnType] listeners: Collection[_ListenerFnType] _exec_once: bool @@ -422,16 +431,23 @@ class _CompoundListener(_InstanceLevelDispatch[_ET]): def _set_asyncio(self) -> None: self._is_asyncio = True - def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol: - if self._is_asyncio: - return AsyncAdaptedLock() - else: - return threading.Lock() + def _get_exec_once_mutex(self) -> _MutexProtocol: + with util.mini_gil: + if self._exec_once_mutex is not None: + return self._exec_once_mutex + + if self._is_asyncio: + mutex = AsyncAdaptedLock() + else: + mutex = threading.Lock() # type: ignore[assignment] + self._exec_once_mutex = mutex + + return mutex def _exec_once_impl( self, retry_on_exception: bool, *args: Any, **kw: Any ) -> None: - with self._exec_once_mutex: + with self._get_exec_once_mutex(): if not self._exec_once: try: self(*args, **kw) @@ -472,13 +488,15 @@ class _CompoundListener(_InstanceLevelDispatch[_ET]): raised an exception. If _exec_w_sync_on_first_run was already called and didn't raise an - exception, then a mutex is not used. + exception, then a mutex is not used. It's not guaranteed + the mutex won't be used more than once in the case of very rare + race conditions. .. versionadded:: 1.4.11 """ if not self._exec_w_sync_once: - with self._exec_once_mutex: + with self._get_exec_once_mutex(): try: self(*args, **kw) except: @@ -540,6 +558,7 @@ class _ListenerCollection(_CompoundListener[_ET]): parent.update_subclass(target_cls) self._exec_once = False self._exec_w_sync_once = False + self._exec_once_mutex = None self.parent_listeners = parent._clslevel[target_cls] self.parent = parent self.name = parent.name @@ -617,6 +636,8 @@ class _JoinedListener(_CompoundListener[_ET]): local: _EmptyListener[_ET], ): self._exec_once = False + self._exec_w_sync_once = False + self._exec_once_mutex = None self.parent_dispatch = parent_dispatch self.name = name self.local = local diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index fc60436bbb..b0e7d544fa 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -907,25 +907,28 @@ class FromClause(roles.AnonymizedFromClauseRole, Selectable): return self._columns.as_readonly() def _setup_collections(self) -> None: - assert "_columns" not in self.__dict__ - assert "primary_key" not in self.__dict__ - assert "foreign_keys" not in self.__dict__ - - _columns: ColumnCollection[Any, Any] = ColumnCollection() - primary_key = ColumnSet() - foreign_keys: Set[KeyedColumnElement[Any]] = set() - - self._populate_column_collection( - columns=_columns, - primary_key=primary_key, - foreign_keys=foreign_keys, - ) + with util.mini_gil: + # detect another thread that raced ahead + if "_columns" in self.__dict__: + assert "primary_key" in self.__dict__ + assert "foreign_keys" in self.__dict__ + return + + _columns: ColumnCollection[Any, Any] = ColumnCollection() + primary_key = ColumnSet() + foreign_keys: Set[KeyedColumnElement[Any]] = set() + + self._populate_column_collection( + columns=_columns, + primary_key=primary_key, + foreign_keys=foreign_keys, + ) - # assigning these three collections separately is not itself atomic, - # but greatly reduces the surface for problems - self._columns = _columns - self.primary_key = primary_key # type: ignore - self.foreign_keys = foreign_keys # type: ignore + # assigning these three collections separately is not itself + # atomic, but greatly reduces the surface for problems + self._columns = _columns + self.primary_key = primary_key # type: ignore + self.foreign_keys = foreign_keys # type: ignore @util.ro_non_memoized_property def entity_namespace(self) -> _EntityNamespace: diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py index 0d90947e44..66b6b00996 100644 --- a/lib/sqlalchemy/testing/profiling.py +++ b/lib/sqlalchemy/testing/profiling.py @@ -26,6 +26,7 @@ import sys from . import config from .util import gc_collect +from ..util import freethreading from ..util import has_compiled_ext @@ -99,6 +100,8 @@ class ProfileStatsFile: # keep it at 2.7, 3.1, 3.2, etc. for now. py_version = ".".join([str(v) for v in sys.version_info[0:2]]) + if freethreading: + py_version += "t" platform_tokens = [ platform.machine(), diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index b717cfc8ea..b90e8363af 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -1646,6 +1646,12 @@ class SuiteRequirements(Requirements): lambda: util.cpython, "cPython interpreter needed" ) + @property + def gil_enabled(self): + return exclusions.only_if( + lambda: not util.freethreading, "GIL-enabled build needed" + ) + @property def is64bit(self): return exclusions.only_if(lambda: util.is64bit, "64bit required") @@ -1668,7 +1674,7 @@ class SuiteRequirements(Requirements): gc.collect() is called, as well as clean out unreferenced subclasses. """ - return self.cpython + return self.cpython + self.gil_enabled @property def no_coverage(self): diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index 12ac5df1c5..eb4775ca0e 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -2698,6 +2698,25 @@ class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase): ], ) + def test_index_column_order(self, metadata, inspect_for_table): + """test for #12894""" + with inspect_for_table("sa_multi_index") as (schema, inspector): + test_table = Table( + "sa_multi_index", + metadata, + Column("Column1", Integer, primary_key=True), + Column("Column2", Integer), + Column("Column3", Integer), + ) + Index( + "Index_Example", + test_table.c.Column3, + test_table.c.Column1, + test_table.c.Column2, + ) + indexes = inspector.get_indexes("sa_multi_index") + eq_(indexes[0]["column_names"], ["Column3", "Column1", "Column2"]) + @testing.requires.indexes_with_expressions def test_reflect_expression_based_indexes(self, metadata, connection): t = Table( diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index 1ccebc47fc..55770c890a 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -56,10 +56,12 @@ from .compat import cpython as cpython from .compat import dataclass_fields as dataclass_fields from .compat import decode_backslashreplace as decode_backslashreplace from .compat import dottedgetter as dottedgetter +from .compat import freethreading as freethreading from .compat import has_refcount_gc as has_refcount_gc from .compat import inspect_getfullargspec as inspect_getfullargspec from .compat import is64bit as is64bit from .compat import local_dataclass_fields as local_dataclass_fields +from .compat import mini_gil as mini_gil from .compat import osx as osx from .compat import py310 as py310 from .compat import py311 as py311 diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index 2ee4703118..abbbd628f6 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -17,6 +17,7 @@ import inspect import operator import platform import sys +import sysconfig import typing from typing import Any from typing import Callable @@ -42,6 +43,7 @@ py39 = sys.version_info >= (3, 9) py38 = sys.version_info >= (3, 8) pypy = platform.python_implementation() == "PyPy" cpython = platform.python_implementation() == "CPython" +freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) win32 = sys.platform.startswith("win") osx = sys.platform.startswith("darwin") @@ -301,3 +303,14 @@ def local_dataclass_fields(cls: Type[Any]) -> Iterable[dataclasses.Field[Any]]: return [f for f in dataclasses.fields(cls) if f not in super_fields] else: return [] + + +if freethreading: + import threading + + mini_gil = threading.RLock() + """provide a threading.RLock() under python freethreading only""" +else: + import contextlib + + mini_gil = contextlib.nullcontext() # type: ignore[assignment] diff --git a/lib/sqlalchemy/util/concurrency.py b/lib/sqlalchemy/util/concurrency.py index 006340f5bf..791fa8f834 100644 --- a/lib/sqlalchemy/util/concurrency.py +++ b/lib/sqlalchemy/util/concurrency.py @@ -6,6 +6,8 @@ # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: allow-untyped-defs, allow-untyped-calls +"""asyncio-related concurrency functions.""" + from __future__ import annotations import asyncio # noqa diff --git a/lib/sqlalchemy/util/langhelpers.py b/lib/sqlalchemy/util/langhelpers.py index ebdd8ffa04..5a4d6495e2 100644 --- a/lib/sqlalchemy/util/langhelpers.py +++ b/lib/sqlalchemy/util/langhelpers.py @@ -1374,6 +1374,9 @@ class MemoizedSlots: This allows the functionality of memoized_property and memoized_instancemethod to be available to a class using __slots__. + The memoized get is not threadsafe under freethreading and the + creator method may in extremely rare cases be called more than once. + """ __slots__ = () @@ -1394,20 +1397,20 @@ class MemoizedSlots: setattr(self, key, value) return value elif hasattr(self.__class__, f"_memoized_method_{key}"): - fn = getattr(self, f"_memoized_method_{key}") + meth = getattr(self, f"_memoized_method_{key}") def oneshot(*args, **kw): - result = fn(*args, **kw) + result = meth(*args, **kw) def memo(*a, **kw): return result - memo.__name__ = fn.__name__ - memo.__doc__ = fn.__doc__ + memo.__name__ = meth.__name__ + memo.__doc__ = meth.__doc__ setattr(self, key, memo) return result - oneshot.__doc__ = fn.__doc__ + oneshot.__doc__ = meth.__doc__ return oneshot else: return self._fallback_getattr(key) diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py index b6745e8b0b..ebcc41cd2e 100644 --- a/test/aaa_profiling/test_memusage.py +++ b/test/aaa_profiling/test_memusage.py @@ -1837,83 +1837,103 @@ class MiscMemoryIntensiveTests(fixtures.TestBase): class WeakIdentityMapTest(_fixtures.FixtureTest): run_inserts = None + def run_up_to_n_times(self, fn, times): + error = None + for _ in range(times): + try: + fn() + except Exception as err: + error = err + continue + else: + break + else: + if error: + raise error + @testing.requires.predictable_gc def test_weakref(self): """test the weak-referencing identity map, which strongly- references modified items.""" users, User = self.tables.users, self.classes.User - - s = fixture_session() self.mapper_registry.map_imperatively(User, users) - gc_collect() - s.add(User(name="ed")) - s.flush() - assert not s.dirty + def go(): + with Session(testing.db) as s: + gc_collect() - user = s.query(User).one() + s.add(User(name="ed")) + s.flush() + assert not s.dirty - # heisenberg the GC a little bit, since #7823 caused a lot more - # GC when mappings are set up, larger test suite started failing - # on this being gc'ed - user_is = user._sa_instance_state - del user - gc_collect() - gc_collect() - gc_collect() - assert user_is.obj() is None + user = s.query(User).one() - assert len(s.identity_map) == 0 + # heisenberg the GC a little bit, since #7823 caused a lot more + # GC when mappings are set up, larger test suite started + # failing on this being gc'ed + user_is = user._sa_instance_state + del user + gc_collect() + gc_collect() + gc_collect() + assert user_is.obj() is None - user = s.query(User).one() - user.name = "fred" - del user - gc_collect() - assert len(s.identity_map) == 1 - assert len(s.dirty) == 1 - assert None not in s.dirty - s.flush() - gc_collect() - assert not s.dirty - assert not s.identity_map + assert len(s.identity_map) == 0 - user = s.query(User).one() - assert user.name == "fred" - assert s.identity_map + user = s.query(User).one() + user.name = "fred" + del user + gc_collect() + assert len(s.identity_map) == 1 + assert len(s.dirty) == 1 + assert None not in s.dirty + s.flush() + gc_collect() + assert not s.dirty + assert not s.identity_map + + user = s.query(User).one() + assert user.name == "fred" + assert s.identity_map + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_pickled(self): users, User = self.tables.users, pickleable.User - - s = fixture_session() self.mapper_registry.map_imperatively(User, users) - gc_collect() - s.add(User(name="ed")) - s.flush() - assert not s.dirty + def go(): + with Session(testing.db) as s: + gc_collect() - user = s.query(User).one() - user.name = "fred" - s.expunge(user) + s.add(User(name="ed")) + s.flush() + assert not s.dirty - u2 = pickle.loads(pickle.dumps(user)) + user = s.query(User).one() + user.name = "fred" + s.expunge(user) - del user - s.add(u2) + u2 = pickle.loads(pickle.dumps(user)) - del u2 - gc_collect() + del user + s.add(u2) - assert len(s.identity_map) == 1 - assert len(s.dirty) == 1 - assert None not in s.dirty - s.flush() - gc_collect() - assert not s.dirty + del u2 + gc_collect() - assert not s.identity_map + assert len(s.identity_map) == 1 + assert len(s.dirty) == 1 + assert None not in s.dirty + s.flush() + gc_collect() + assert not s.dirty + + assert not s.identity_map + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_with_cycles_o2m(self): @@ -1924,7 +1944,6 @@ class WeakIdentityMapTest(_fixtures.FixtureTest): self.classes.User, ) - s = fixture_session() self.mapper_registry.map_imperatively( User, users, @@ -1933,27 +1952,39 @@ class WeakIdentityMapTest(_fixtures.FixtureTest): self.mapper_registry.map_imperatively(Address, addresses) gc_collect() - s.add(User(name="ed", addresses=[Address(email_address="ed1")])) - s.commit() + def go(): + with Session(testing.db) as s: + s.add( + User(name="ed", addresses=[Address(email_address="ed1")]) + ) + s.commit() - user = s.query(User).options(joinedload(User.addresses)).one() - user.addresses[0].user # lazyload - eq_(user, User(name="ed", addresses=[Address(email_address="ed1")])) + user = s.query(User).options(joinedload(User.addresses)).one() + user.addresses[0].user # lazyload + eq_( + user, + User(name="ed", addresses=[Address(email_address="ed1")]), + ) - del user - gc_collect() - assert len(s.identity_map) == 0 + del user + gc_collect() + assert len(s.identity_map) == 0 - user = s.query(User).options(joinedload(User.addresses)).one() - user.addresses[0].email_address = "ed2" - user.addresses[0].user # lazyload - del user - gc_collect() - assert len(s.identity_map) == 2 + user = s.query(User).options(joinedload(User.addresses)).one() + user.addresses[0].email_address = "ed2" + user.addresses[0].user # lazyload + del user + gc_collect() + assert len(s.identity_map) == 2 - s.commit() - user = s.query(User).options(joinedload(User.addresses)).one() - eq_(user, User(name="ed", addresses=[Address(email_address="ed2")])) + s.commit() + user = s.query(User).options(joinedload(User.addresses)).one() + eq_( + user, + User(name="ed", addresses=[Address(email_address="ed2")]), + ) + + self.run_up_to_n_times(go, 10) @testing.requires.predictable_gc def test_weakref_with_cycles_o2o(self): diff --git a/test/aaa_profiling/test_threading.py b/test/aaa_profiling/test_threading.py new file mode 100644 index 0000000000..9aecdfdd46 --- /dev/null +++ b/test/aaa_profiling/test_threading.py @@ -0,0 +1,291 @@ +import random +import threading +import time + +import sqlalchemy as sa +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import testing +from sqlalchemy.orm import scoped_session +from sqlalchemy.orm import sessionmaker +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import fixtures +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table + +NUM_THREADS = 10 +ITERATIONS = 10 + + +class _ThreadTest: + def run_threaded( + self, + func, + *thread_args, + nthreads=NUM_THREADS, + use_barrier=False, + **thread_kwargs, + ): + barrier = threading.Barrier(nthreads) + results = [] + errors = [] + + def thread_func(*args, **kwargs): + thread_name = threading.current_thread().name + if use_barrier: + barrier.wait() + + local_result = [] + try: + func(local_result, thread_name, *args, **kwargs) + results.append(tuple(local_result)) + except Exception as e: + # raise + errors.append((thread_name, repr(e))) + + threads = [ + threading.Thread( + name=f"thread-{i}", + target=thread_func, + args=thread_args, + kwargs=thread_kwargs, + ) + for i in range(nthreads) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + return results, errors + + @testing.fixture + def num_threads_engine(self, testing_engine): + return testing_engine(options=dict(pool_size=NUM_THREADS)) + + +@testing.add_to_marker.timing_intensive +class EngineThreadSafetyTest(_ThreadTest, fixtures.TablesTest): + run_dispose_bind = "once" + + __requires__ = ("multithreading_support",) + + @classmethod + def define_tables(cls, metadata): + Table( + "test_table", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("thread_id", Integer), + Column("data", String(50)), + ) + + @testing.combinations( + (NUM_THREADS, 0), + (3, 5), + (3, 0), + (7, 0), + argnames="pool_size, max_overflow", + ) + def test_engine_thread_safe(self, testing_engine, pool_size, max_overflow): + """Test that a single Engine can be safely shared across threads.""" + test_table = self.tables.test_table + + engine = testing_engine( + options=dict(pool_size=pool_size, max_overflow=max_overflow) + ) + + def worker(results, thread_name): + for _ in range(ITERATIONS): + with engine.connect() as conn: + conn.execute( + test_table.insert(), + {"data": thread_name}, + ) + conn.commit() + + result = conn.execute( + sa.select(test_table.c.data).where( + test_table.c.data == thread_name + ) + ).scalar() + results.append(result) + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + eq_( + set(results), + { + tuple([f"thread-{i}" for j in range(ITERATIONS)]) + for i in range(NUM_THREADS) + }, + ) + + def test_metadata_thread_safe(self, num_threads_engine): + """Test that MetaData objects are thread-safe for reads.""" + metadata = sa.MetaData() + + for thread_id in range(NUM_THREADS): + Table( + f"thread-{thread_id}", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(50)), + ) + + metadata.create_all(testing.db) + + def worker(results, thread_name): + table_key = thread_name + assert table_key in metadata.tables, f"{table_key} does not exist" + with num_threads_engine.connect() as conn: + # Will raise if it cannot connect so erros will be populated + conn.execute(sa.select(metadata.tables[table_key])) + + _, errors = self.run_threaded(worker) + eq_(errors, []) + + +@testing.add_to_marker.timing_intensive +class SessionThreadingTest(_ThreadTest, fixtures.MappedTest): + run_dispose_bind = "once" + + __requires__ = ("multithreading_support",) + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column( + "id", Integer, primary_key=True, test_needs_autoincrement=True + ), + Column("name", String(50)), + Column("thread_id", String(50)), + ) + + @classmethod + def setup_classes(cls): + class User(cls.Comparable): + pass + + def test_sessionmaker_thread_safe(self, num_threads_engine): + """Test that sessionmaker factory is thread-safe.""" + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + # Single sessionmaker shared across threads + SessionFactory = sessionmaker(num_threads_engine) + + def worker(results, thread_name): + thread_id = thread_name + + for _ in range(ITERATIONS): + with SessionFactory() as session: + for i in range(3): + user = User( + name=f"user_{thread_id}_{i}", thread_id=thread_id + ) + session.add(user) + session.commit() + + count = ( + session.query(User) + .filter_by(thread_id=thread_id) + .count() + ) + results.append(count) + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + eq_( + results, + [ + tuple(range(3, 3 * ITERATIONS + 3, 3)) + for _ in range(NUM_THREADS) + ], + ) + + def test_scoped_session_thread_local(self, num_threads_engine): + """Test that scoped_session provides thread-local sessions.""" + users, User = self.tables.users, self.classes.User + self.mapper_registry.map_imperatively(User, users) + + # Create scoped session + Session = scoped_session(sessionmaker(num_threads_engine)) + + session_ids = {} + + def worker(results, thread_name): + thread_id = thread_name + + session = Session() + session_ids[thread_id] = id(session) + session.close() + + for _ in range(ITERATIONS): + user = User( + name=f"scoped_user_{thread_id}", thread_id=thread_id + ) + Session.add(user) + Session.commit() + + session2 = Session() + assert id(session2) == session_ids[thread_id] + session2.close() + + count = ( + Session.query(User).filter_by(thread_id=thread_id).count() + ) + results.append(count) + Session.remove() + + results, errors = self.run_threaded(worker) + + eq_(errors, []) + unique_sessions = set(session_ids.values()) + eq_(len(unique_sessions), NUM_THREADS) + eq_( + results, + [tuple(range(1, ITERATIONS + 1)) for _ in range(NUM_THREADS)], + ) + + +@testing.add_to_marker.timing_intensive +class FromClauseConcurrencyTest(_ThreadTest, fixtures.TestBase): + """test for issue #12302""" + + @testing.variation("collection", ["c", "primary_key", "foreign_keys"]) + def test_c_collection(self, collection): + dictionary_meta = MetaData() + all_indexes_table = Table( + "all_indexes", + dictionary_meta, + *[Column(f"col{i}", Integer) for i in range(50)], + ) + + def use_table(results, errors): + for i in range(3): + time.sleep(random.random() * 0.0001) + if collection.c: + all_indexes.c.col35 + elif collection.primary_key: + all_indexes.primary_key + elif collection.foreign_keys: + all_indexes.foreign_keys + + for j in range(1000): + all_indexes = all_indexes_table.alias("a_indexes") + + results, errors = self.run_threaded( + use_table, use_barrier=False, nthreads=5 + ) + + eq_(errors, []) + eq_(len(results), 5) diff --git a/test/base/test_events.py b/test/base/test_events.py index ccb53f2bb3..cc21708b77 100644 --- a/test/base/test_events.py +++ b/test/base/test_events.py @@ -854,6 +854,7 @@ class SubclassGrowthTest(TearDownLocalEventsFixture, fixtures.TestBase): self.Target = Target + @testing.requires.predictable_gc def test_subclass(self): class SubTarget(self.Target): pass diff --git a/test/base/test_tutorials.py b/test/base/test_tutorials.py index d86322e12e..95b2a03374 100644 --- a/test/base/test_tutorials.py +++ b/test/base/test_tutorials.py @@ -177,6 +177,9 @@ class DocTest(fixtures.TestBase): def test_orm_quickstart(self): self._run_doctest("orm/quickstart.rst") + # this crashes on 3.13t but passes on 3.14t. + # just requiring non-freethreaded for now + @requires.gil_enabled @requires.greenlet def test_asyncio(self): try: diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 49736df9b6..e876c0419c 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -967,6 +967,7 @@ class PoolFirstConnectSyncTest(PoolTestBase): evt.connect() def checkout(): + barrier.wait() for j in range(2): c1 = pool.connect() time.sleep(0.02) @@ -981,6 +982,7 @@ class PoolFirstConnectSyncTest(PoolTestBase): # any of the connections get returned. so first_connect() # sleeps for one second, then pings the mock. the threads should # not have made it to the "checkout() event for that one second. + barrier = threading.Barrier(5) for i in range(5): th = threading.Thread(target=checkout) th.start() @@ -1112,6 +1114,7 @@ class QueuePoolTest(PoolTestBase): timeouts = [] def checkout(): + barrier.wait() for x in range(1): now = time.time() try: @@ -1122,6 +1125,7 @@ class QueuePoolTest(PoolTestBase): time.sleep(4) c1.close() + barrier = threading.Barrier(10) threads = [] for i in range(10): th = threading.Thread(target=checkout) diff --git a/test/profiles.txt b/test/profiles.txt index adb61d1e43..1c880822b2 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -1,15 +1,15 @@ # /home/classic/dev/sqlalchemy/test/profiles.txt # This file is written out on a per-environment basis. -# For each test in aaa_profiling, the corresponding function and +# For each test in aaa_profiling, the corresponding function and # environment is located within this file. If it doesn't exist, # the test is skipped. -# If a callcount does exist, it is compared to what we received. +# If a callcount does exist, it is compared to what we received. # assertions are raised if the counts do not match. -# -# To add a new callcount test, apply the function_call_count -# decorator and re-run the tests using the --write-profiles +# +# To add a new callcount test, apply the function_call_count +# decorator and re-run the tests using the --write-profiles # option - this file will be rewritten including the new count. -# +# # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert @@ -436,10 +436,10 @@ test.aaa_profiling.test_orm.WithExpresionLoaderOptTest.test_from_opt_no_cache x8 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 78 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 78 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 79 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 79 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 73 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 73 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect diff --git a/test/requirements.py b/test/requirements.py index 66c0fb174f..16d4db465e 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -408,7 +408,13 @@ class DefaultRequirements(SuiteRequirements): gc.collect() is called, as well as clean out unreferenced subclasses. """ - return self.cpython + skip_if("+aiosqlite") + return self.cpython + self.gil_enabled + skip_if("+aiosqlite") + + @property + def multithreading_support(self): + """target platform allows use of threads without any problem""" + + return skip_if("+aiosqlite") + skip_if(self.sqlite_memory) @property def memory_process_intensive(self): diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 6183181bc4..e69b09dfb8 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -1,9 +1,6 @@ """Test various algorithmic properties of selectables.""" from itertools import zip_longest -import random -import threading -import time from sqlalchemy import and_ from sqlalchemy import bindparam @@ -4079,39 +4076,3 @@ class AliasTest(fixtures.TestBase, AssertsCompiledSQL): a3 = a2._clone() a3._copy_internals() is_(a1.corresponding_column(a3.c.c), a1.c.c) - - -class FromClauseConcurrencyTest(fixtures.TestBase): - """test for issue 12302""" - - @testing.requires.timing_intensive - def test_c_collection(self): - dictionary_meta = MetaData() - all_indexes_table = Table( - "all_indexes", - dictionary_meta, - *[Column(f"col{i}", Integer) for i in range(50)], - ) - - fails = 0 - - def use_table(): - nonlocal fails - try: - for i in range(3): - time.sleep(random.random() * 0.0001) - all_indexes.c.col35 - except: - fails += 1 - raise - - for j in range(1000): - all_indexes = all_indexes_table.alias("a_indexes") - - threads = [threading.Thread(target=use_table) for i in range(5)] - for t in threads: - t.start() - for t in threads: - t.join() - - assert not fails, "one or more runs failed" diff --git a/tox.ini b/tox.ini index 40bb95cca7..7af3f527ea 100644 --- a/tox.ini +++ b/tox.ini @@ -7,10 +7,15 @@ extras= asyncio sqlite: aiosqlite sqlite_file: aiosqlite - postgresql: postgresql_asyncpg + + # asyncpg doesnt build on free threading backends + py{38,39,310,311,312,313,314}-postgresql: postgresql_asyncpg + mysql: asyncmy mysql: aiomysql - mssql: aioodbc + + # aioodbc builds on free threading but segfaults once you use it + py{38,39,310,311,312,313,314}-mssql: aioodbc [testenv] cov_args=--cov=sqlalchemy --cov-report term --cov-append --cov-report xml --exclude-tag memory-intensive --exclude-tag timing-intensive -k "not aaa_profiling" @@ -27,7 +32,9 @@ extras= # this can be limited to specific python versions IF there is no # greenlet available for the most recent python. otherwise # keep this present in all cases - py{38,39,310,311,312,313,314}: {[greenletextras]extras} + # py{38,39,310,311,312,313,314,313t,314t}: {[greenletextras]extras} + + {[greenletextras]extras} postgresql: postgresql postgresql: postgresql_pg8000 @@ -114,6 +121,8 @@ setenv= WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5} + {py313t,py314t}: PYTHON_GIL=0 # when running with 313t, 314t, go for broke + # affect setup.py to skip or build the cython extensions. nocext: DISABLE_SQLALCHEMY_CEXT=1 cext: REQUIRE_SQLALCHEMY_CEXT=1 @@ -129,28 +138,30 @@ setenv= oracle: WORKERS={env:TOX_WORKERS:-n2 --max-worker-restart=5} oracle: ORACLE={env:TOX_ORACLE:--db oracle} oracle: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb --dbdriver oracledb_async} + oracle-nogreenlet: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb} sqlite: SQLITE={env:TOX_SQLITE:--db sqlite} - sqlite-py{38,39,310,311,312,313}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite} - sqlite-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric} + sqlite-py{38,39,310,311,312,313,314}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite} + sqlite-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric} sqlite_file: SQLITE={env:TOX_SQLITE_FILE:--db sqlite_file} sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite} - sqlite_file-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite} + sqlite_file-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite} postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql} postgresql: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver asyncpg --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async} postgresql-nogreenlet: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg} + postgresql-{py313t,py314t}: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async} mysql: MYSQL={env:TOX_MYSQL:--db mysql} mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver asyncmy --dbdriver aiomysql --dbdriver mariadbconnector} mysql-nogreenlet: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector} + # for mssql, aioodbc frequently segfaults under free-threaded builds mssql: MSSQL={env:TOX_MSSQL:--db mssql} mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc --dbdriver pymssql} mssql-py314: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc} - mssql-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql} - mssql-py314-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc} + mssql-{py313t,py314t,nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql} oracle,mssql,sqlite_file: IDENTS=--write-idents db_idents.txt @@ -175,6 +186,7 @@ passenv= EXTRA_PG_DRIVERS EXTRA_MYSQL_DRIVERS EXTRA_ORACLE_DRIVERS + PYTHON_GIL commands= @@ -293,23 +305,54 @@ extras = {[testenv:lint]extras} # command run in the github action when cext are active. -[testenv:github-cext] +# unfortunately it seems impossible to use generative tags with envs +# that are not the default. so in the interim, build out three separate +# envs with explicit names + +[testenv:githubbase] extras= - {[greenletextras]extras} +setenv= + PYTHONNOUSERSITE=1 + + WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5} + PYTEST_COLOR={tty:--color=yes} + PYTEST_EXCLUDES=-m "not memory_intensive and not mypy and not timing_intensive" + SQLITE=--db sqlite -deps = {[testenv]deps} - .[aiosqlite] commands= - python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs} + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt -# command run in the github action when cext are not active. -[testenv:github-nocext] +[testenv:github-cext-greenlet] extras= - {[greenletextras]extras} + asyncio + aiosqlite +setenv= + {[testenv:githubbase]setenv} + REQUIRE_SQLALCHEMY_CEXT=1 + SQLITE=--db sqlite --db aiosqlite +commands= + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} + oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt + +[testenv:github-cext] +extras= +setenv= + {[testenv:githubbase]setenv} + REQUIRE_SQLALCHEMY_CEXT=1 +deps = {[testenv:githubbase]deps} +commands= + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} + oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt -deps = {[testenv]deps} - .[aiosqlite] + +[testenv:github-nocext] +extras= +setenv= + {[testenv:githubbase]setenv} + DISABLE_SQLALCHEMY_CEXT=1 +deps = {[testenv:githubbase]deps} commands= - python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs} + python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs} oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt +