python:
- "cp310-* cp311-*"
- "cp312-* cp313-*"
+ - "cp313t-*"
wheel_mode:
- compiled
os:
- name: Build compiled wheels
if: ${{ matrix.wheel_mode == 'compiled' }}
- uses: pypa/cibuildwheel@v2.22.0
+ uses: pypa/cibuildwheel@v3.2.0
env:
CIBW_ARCHS_LINUX: ${{ matrix.linux_archs }}
CIBW_BUILD: ${{ matrix.python }}
+ CIBW_ENABLE: ${{ matrix.python == 'cp313t-*' && 'cpython-freethreading' || '' }}
# setting it here does not work on linux
# PYTHONNOUSERSITE: "1"
python-version:
- "3.13"
build-type:
+ - "cext-greenlet"
- "cext"
- "nocext"
architecture:
pip list
- name: Run tests
- run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }}
+ run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }}
run-tox:
name: ${{ matrix.tox-env }}-${{ matrix.python-version }}
- "3.11"
- "3.12"
- "3.13"
+ - "3.13t"
- "3.14.0-alpha - 3.14"
+ - "3.14t-dev"
- "pypy-3.10"
build-type:
+ # builds greenlet, runs asyncio tests. includes aiosqlite driver
+ - "cext-greenlet"
+
+ # these do not install greenlet at all and skip asyncio tests.
+ # does not include aiosqlite driver
- "cext"
- "nocext"
architecture:
# autocommit tests fail on the ci for some reason
- python-version: "pypy-3.10"
pytest-args: "-k 'not test_autocommit_on and not test_turn_autocommit_off_via_default_iso_level and not test_autocommit_isolation_level'"
- - os: "ubuntu-22.04"
- pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite"
- - os: "ubuntu-22.04-arm"
- pytest-args: "--dbdriver pysqlite --dbdriver aiosqlite"
-
exclude:
+ # cext-greenlet only runs on ubuntu x64 and arm64
+ - build-type: "cext-greenlet"
+ os: "windows-latest"
+ - build-type: "cext-greenlet"
+ os: "windows-11-arm"
+ - build-type: "cext-greenlet"
+ os: "macos-latest"
+ - build-type: "cext-greenlet"
+ os: "macos-13"
+
+ # the threaded pythons are not stable under greenlet. Even
+ # though we can run individual tests, when you run the whole suite
+ # with xdist and the greenlet wrapper, the workers keep crashing
+ # and getting replaced
+ - build-type: "cext-greenlet"
+ python-version: "3.13t"
+
+ - build-type: "cext-greenlet"
+ python-version: "3.14t-dev"
+
+ # skip py 3.14 on x64, because greenlet builds are not on
+ # pypi and these dont compile yet
+ - architecture: x64
+ python-version:
+ - "3.14.0-alpha - 3.14"
+ - "3.14t-dev"
# linux do not have x86 / arm64 python
- os: "ubuntu-22.04"
architecture: x86
python-version: ${{ matrix.python-version }}
architecture: ${{ matrix.architecture }}
- - name: Remove greenlet
- if: ${{ matrix.no-greenlet == 'true' }}
- shell: pwsh
- run: |
- (cat setup.cfg) | %{$_ -replace "^\s*greenlet.+",""} | set-content setup.cfg
-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip list
- name: Run tests
- run: tox -e github-${{ matrix.build-type }} -- -q --nomemory --notimingintensive ${{ matrix.pytest-args }}
+ run: tox -e github-${{ matrix.build-type }} -- ${{ matrix.pytest-args }}
+ env:
+ # under free threading, make sure to disable GIL
+ PYTHON_GIL: ${{ contains(matrix.python-version, 't') && '0' || '' }}
continue-on-error: ${{ matrix.python-version == 'pypy-3.10' }}
run-tox:
os:
- "ubuntu-22.04"
python-version:
- - "3.10"
- - "3.11"
- "3.12"
- "3.13"
tox-env:
--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 12881
+
+ Implemented initial support for free-threaded Python by adding new tests
+ and reworking the test harness and GitHub Actions to include Python 3.13t
+ and Python 3.14t in test runs. Two concurrency issues have been identified
+ and fixed: the first involves initialization of the ``.c`` collection on a
+ ``FromClause``, a continuation of :ticket:`12302`, where an optional mutex
+ under free-threading is added; the second involves synchronization of the
+ pool "first_connect" event, which first received thread synchronization in
+ :ticket:`2964`, however under free-threading the creation of the mutex
+ itself runs under the same free-threading mutex. Initial pull request and
+ test suite courtesy Lysandros Nikolaou.
obj = cast("_Dispatch[_ET]", obj)
assert obj._instance_cls is not None
- result = _ListenerCollection(self.parent, obj._instance_cls)
- if getattr(obj, self.name) is self:
- setattr(obj, self.name, result)
- else:
- assert isinstance(getattr(obj, self.name), _JoinedListener)
+ existing = getattr(obj, self.name)
+
+ with util.mini_gil:
+ if existing is self or isinstance(existing, _JoinedListener):
+ result = _ListenerCollection(self.parent, obj._instance_cls)
+ else:
+ # this codepath is an extremely rare race condition
+ # that has been observed in test_pool.py->test_timeout_race
+ # with freethreaded.
+ assert isinstance(existing, _ListenerCollection)
+ return existing
+
+ if existing is self:
+ setattr(obj, self.name, result)
return result
def _needs_modify(self, *args: Any, **kw: Any) -> NoReturn:
"_is_asyncio",
)
- _exec_once_mutex: _MutexProtocol
+ _exec_once_mutex: Optional[_MutexProtocol]
parent_listeners: Collection[_ListenerFnType]
listeners: Collection[_ListenerFnType]
_exec_once: bool
def _set_asyncio(self) -> None:
self._is_asyncio = True
- def _memoized_attr__exec_once_mutex(self) -> _MutexProtocol:
- if self._is_asyncio:
- return AsyncAdaptedLock()
- else:
- return threading.Lock()
+ def _get_exec_once_mutex(self) -> _MutexProtocol:
+ with util.mini_gil:
+ if self._exec_once_mutex is not None:
+ return self._exec_once_mutex
+
+ if self._is_asyncio:
+ mutex = AsyncAdaptedLock()
+ else:
+ mutex = threading.Lock() # type: ignore[assignment]
+ self._exec_once_mutex = mutex
+
+ return mutex
def _exec_once_impl(
self, retry_on_exception: bool, *args: Any, **kw: Any
) -> None:
- with self._exec_once_mutex:
+ with self._get_exec_once_mutex():
if not self._exec_once:
try:
self(*args, **kw)
raised an exception.
If _exec_w_sync_on_first_run was already called and didn't raise an
- exception, then a mutex is not used.
+ exception, then a mutex is not used. It's not guaranteed
+ the mutex won't be used more than once in the case of very rare
+ race conditions.
.. versionadded:: 1.4.11
"""
if not self._exec_w_sync_once:
- with self._exec_once_mutex:
+ with self._get_exec_once_mutex():
try:
self(*args, **kw)
except:
parent.update_subclass(target_cls)
self._exec_once = False
self._exec_w_sync_once = False
+ self._exec_once_mutex = None
self.parent_listeners = parent._clslevel[target_cls]
self.parent = parent
self.name = parent.name
local: _EmptyListener[_ET],
):
self._exec_once = False
+ self._exec_w_sync_once = False
+ self._exec_once_mutex = None
self.parent_dispatch = parent_dispatch
self.name = name
self.local = local
return self._columns.as_readonly()
def _setup_collections(self) -> None:
- assert "_columns" not in self.__dict__
- assert "primary_key" not in self.__dict__
- assert "foreign_keys" not in self.__dict__
-
- _columns: ColumnCollection[Any, Any] = ColumnCollection()
- primary_key = ColumnSet()
- foreign_keys: Set[KeyedColumnElement[Any]] = set()
-
- self._populate_column_collection(
- columns=_columns,
- primary_key=primary_key,
- foreign_keys=foreign_keys,
- )
+ with util.mini_gil:
+ # detect another thread that raced ahead
+ if "_columns" in self.__dict__:
+ assert "primary_key" in self.__dict__
+ assert "foreign_keys" in self.__dict__
+ return
+
+ _columns: ColumnCollection[Any, Any] = ColumnCollection()
+ primary_key = ColumnSet()
+ foreign_keys: Set[KeyedColumnElement[Any]] = set()
+
+ self._populate_column_collection(
+ columns=_columns,
+ primary_key=primary_key,
+ foreign_keys=foreign_keys,
+ )
- # assigning these three collections separately is not itself atomic,
- # but greatly reduces the surface for problems
- self._columns = _columns
- self.primary_key = primary_key # type: ignore
- self.foreign_keys = foreign_keys # type: ignore
+ # assigning these three collections separately is not itself
+ # atomic, but greatly reduces the surface for problems
+ self._columns = _columns
+ self.primary_key = primary_key # type: ignore
+ self.foreign_keys = foreign_keys # type: ignore
@util.ro_non_memoized_property
def entity_namespace(self) -> _EntityNamespace:
from . import config
from .util import gc_collect
+from ..util import freethreading
from ..util import has_compiled_ext
# keep it at 2.7, 3.1, 3.2, etc. for now.
py_version = ".".join([str(v) for v in sys.version_info[0:2]])
+ if freethreading:
+ py_version += "t"
platform_tokens = [
platform.machine(),
lambda: util.cpython, "cPython interpreter needed"
)
+ @property
+ def gil_enabled(self):
+ return exclusions.only_if(
+ lambda: not util.freethreading, "GIL-enabled build needed"
+ )
+
@property
def is64bit(self):
return exclusions.only_if(lambda: util.is64bit, "64bit required")
gc.collect() is called, as well as clean out unreferenced subclasses.
"""
- return self.cpython + self.only_linux
+ return self.cpython + self.gil_enabled
@property
def no_coverage(self):
],
)
+ def test_index_column_order(self, metadata, inspect_for_table):
+ """test for #12894"""
+ with inspect_for_table("sa_multi_index") as (schema, inspector):
+ test_table = Table(
+ "sa_multi_index",
+ metadata,
+ Column("Column1", Integer, primary_key=True),
+ Column("Column2", Integer),
+ Column("Column3", Integer),
+ )
+ Index(
+ "Index_Example",
+ test_table.c.Column3,
+ test_table.c.Column1,
+ test_table.c.Column2,
+ )
+ indexes = inspector.get_indexes("sa_multi_index")
+ eq_(indexes[0]["column_names"], ["Column3", "Column1", "Column2"])
+
@testing.requires.indexes_with_expressions
def test_reflect_expression_based_indexes(self, metadata, connection):
t = Table(
from .compat import dataclass_fields as dataclass_fields
from .compat import decode_backslashreplace as decode_backslashreplace
from .compat import dottedgetter as dottedgetter
+from .compat import freethreading as freethreading
from .compat import has_refcount_gc as has_refcount_gc
from .compat import inspect_getfullargspec as inspect_getfullargspec
from .compat import is64bit as is64bit
from .compat import local_dataclass_fields as local_dataclass_fields
+from .compat import mini_gil as mini_gil
from .compat import osx as osx
from .compat import py311 as py311
from .compat import py312 as py312
import operator
import platform
import sys
+import sysconfig
import typing
from typing import Any
from typing import Callable
py311 = sys.version_info >= (3, 11)
pypy = platform.python_implementation() == "PyPy"
cpython = platform.python_implementation() == "CPython"
+freethreading = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
win32 = sys.platform.startswith("win")
osx = sys.platform.startswith("darwin")
return [f for f in dataclasses.fields(cls) if f not in super_fields]
else:
return []
+
+
+if freethreading:
+ import threading
+
+ mini_gil = threading.RLock()
+ """provide a threading.RLock() under python freethreading only"""
+else:
+ import contextlib
+
+ mini_gil = contextlib.nullcontext() # type: ignore[assignment]
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: allow-untyped-defs, allow-untyped-calls
+"""asyncio-related concurrency functions."""
+
from __future__ import annotations
import asyncio
This allows the functionality of memoized_property and
memoized_instancemethod to be available to a class using __slots__.
+ The memoized get is not threadsafe under freethreading and the
+ creator method may in extremely rare cases be called more than once.
+
"""
__slots__ = ()
setattr(self, key, value)
return value
elif hasattr(self.__class__, f"_memoized_method_{key}"):
- fn = getattr(self, f"_memoized_method_{key}")
+ meth = getattr(self, f"_memoized_method_{key}")
def oneshot(*args, **kw):
- result = fn(*args, **kw)
+ result = meth(*args, **kw)
def memo(*a, **kw):
return result
- memo.__name__ = fn.__name__
- memo.__doc__ = fn.__doc__
+ memo.__name__ = meth.__name__
+ memo.__doc__ = meth.__doc__
setattr(self, key, memo)
return result
- oneshot.__doc__ = fn.__doc__
+ oneshot.__doc__ = meth.__doc__
return oneshot
else:
return self._fallback_getattr(key)
build-backend = "setuptools.build_meta"
requires = [
"setuptools>=77.0.3",
- "cython>=3; platform_python_implementation == 'CPython'", # Skip cython when using pypy
+ "cython>=3.1; platform_python_implementation == 'CPython'", # Skip cython when using pypy
]
import os
import platform
+import sys
+from typing import Any
from typing import cast
+from typing import Dict
from typing import TYPE_CHECKING
from setuptools import setup
assert _cy_Extension is not None
assert _cy_build_ext is not None
- cython_directives = {"language_level": "3"}
+ cython_directives: Dict[str, Any] = {
+ "language_level": "3",
+ }
+
+ if sys.version_info >= (3, 13):
+ cython_directives["freethreading_compatible"] = True
module_prefix = "sqlalchemy."
source_prefix = "lib/sqlalchemy/"
class WeakIdentityMapTest(_fixtures.FixtureTest):
run_inserts = None
+ def run_up_to_n_times(self, fn, times):
+ error = None
+ for _ in range(times):
+ try:
+ fn()
+ except Exception as err:
+ error = err
+ continue
+ else:
+ break
+ else:
+ if error:
+ raise error
+
@testing.requires.predictable_gc
def test_weakref(self):
"""test the weak-referencing identity map, which strongly-
references modified items."""
users, User = self.tables.users, self.classes.User
-
- s = fixture_session()
self.mapper_registry.map_imperatively(User, users)
- gc_collect()
- s.add(User(name="ed"))
- s.flush()
- assert not s.dirty
+ def go():
+ with Session(testing.db) as s:
+ gc_collect()
- user = s.query(User).one()
+ s.add(User(name="ed"))
+ s.flush()
+ assert not s.dirty
- # heisenberg the GC a little bit, since #7823 caused a lot more
- # GC when mappings are set up, larger test suite started failing
- # on this being gc'ed
- user_is = user._sa_instance_state
- del user
- gc_collect()
- gc_collect()
- gc_collect()
- assert user_is.obj() is None
+ user = s.query(User).one()
- assert len(s.identity_map) == 0
+ # heisenberg the GC a little bit, since #7823 caused a lot more
+ # GC when mappings are set up, larger test suite started
+ # failing on this being gc'ed
+ user_is = user._sa_instance_state
+ del user
+ gc_collect()
+ gc_collect()
+ gc_collect()
+ assert user_is.obj() is None
- user = s.query(User).one()
- user.name = "fred"
- del user
- gc_collect()
- assert len(s.identity_map) == 1
- assert len(s.dirty) == 1
- assert None not in s.dirty
- s.flush()
- gc_collect()
- assert not s.dirty
- assert not s.identity_map
+ assert len(s.identity_map) == 0
- user = s.query(User).one()
- assert user.name == "fred"
- assert s.identity_map
+ user = s.query(User).one()
+ user.name = "fred"
+ del user
+ gc_collect()
+ assert len(s.identity_map) == 1
+ assert len(s.dirty) == 1
+ assert None not in s.dirty
+ s.flush()
+ gc_collect()
+ assert not s.dirty
+ assert not s.identity_map
+
+ user = s.query(User).one()
+ assert user.name == "fred"
+ assert s.identity_map
+
+ self.run_up_to_n_times(go, 10)
@testing.requires.predictable_gc
def test_weakref_pickled(self):
users, User = self.tables.users, pickleable.User
-
- s = fixture_session()
self.mapper_registry.map_imperatively(User, users)
- gc_collect()
- s.add(User(name="ed"))
- s.flush()
- assert not s.dirty
+ def go():
+ with Session(testing.db) as s:
+ gc_collect()
- user = s.query(User).one()
- user.name = "fred"
- s.expunge(user)
+ s.add(User(name="ed"))
+ s.flush()
+ assert not s.dirty
- u2 = pickle.loads(pickle.dumps(user))
+ user = s.query(User).one()
+ user.name = "fred"
+ s.expunge(user)
- del user
- s.add(u2)
+ u2 = pickle.loads(pickle.dumps(user))
- del u2
- gc_collect()
+ del user
+ s.add(u2)
- assert len(s.identity_map) == 1
- assert len(s.dirty) == 1
- assert None not in s.dirty
- s.flush()
- gc_collect()
- assert not s.dirty
+ del u2
+ gc_collect()
- assert not s.identity_map
+ assert len(s.identity_map) == 1
+ assert len(s.dirty) == 1
+ assert None not in s.dirty
+ s.flush()
+ gc_collect()
+ assert not s.dirty
+
+ assert not s.identity_map
+
+ self.run_up_to_n_times(go, 10)
@testing.requires.predictable_gc
def test_weakref_with_cycles_o2m(self):
self.classes.User,
)
- s = fixture_session()
self.mapper_registry.map_imperatively(
User,
users,
self.mapper_registry.map_imperatively(Address, addresses)
gc_collect()
- s.add(User(name="ed", addresses=[Address(email_address="ed1")]))
- s.commit()
+ def go():
+ with Session(testing.db) as s:
+ s.add(
+ User(name="ed", addresses=[Address(email_address="ed1")])
+ )
+ s.commit()
- user = s.query(User).options(joinedload(User.addresses)).one()
- user.addresses[0].user # lazyload
- eq_(user, User(name="ed", addresses=[Address(email_address="ed1")]))
+ user = s.query(User).options(joinedload(User.addresses)).one()
+ user.addresses[0].user # lazyload
+ eq_(
+ user,
+ User(name="ed", addresses=[Address(email_address="ed1")]),
+ )
- del user
- gc_collect()
- assert len(s.identity_map) == 0
+ del user
+ gc_collect()
+ assert len(s.identity_map) == 0
- user = s.query(User).options(joinedload(User.addresses)).one()
- user.addresses[0].email_address = "ed2"
- user.addresses[0].user # lazyload
- del user
- gc_collect()
- assert len(s.identity_map) == 2
+ user = s.query(User).options(joinedload(User.addresses)).one()
+ user.addresses[0].email_address = "ed2"
+ user.addresses[0].user # lazyload
+ del user
+ gc_collect()
+ assert len(s.identity_map) == 2
- s.commit()
- user = s.query(User).options(joinedload(User.addresses)).one()
- eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
+ s.commit()
+ user = s.query(User).options(joinedload(User.addresses)).one()
+ eq_(
+ user,
+ User(name="ed", addresses=[Address(email_address="ed2")]),
+ )
+
+ self.run_up_to_n_times(go, 10)
@testing.requires.predictable_gc
def test_weakref_with_cycles_o2o(self):
--- /dev/null
+import random
+import threading
+import time
+
+import sqlalchemy as sa
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import String
+from sqlalchemy import testing
+from sqlalchemy.orm import scoped_session
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import Table
+
+NUM_THREADS = 10
+ITERATIONS = 10
+
+
+class _ThreadTest:
+ def run_threaded(
+ self,
+ func,
+ *thread_args,
+ nthreads=NUM_THREADS,
+ use_barrier=False,
+ **thread_kwargs,
+ ):
+ barrier = threading.Barrier(nthreads)
+ results = []
+ errors = []
+
+ def thread_func(*args, **kwargs):
+ thread_name = threading.current_thread().name
+ if use_barrier:
+ barrier.wait()
+
+ local_result = []
+ try:
+ func(local_result, thread_name, *args, **kwargs)
+ results.append(tuple(local_result))
+ except Exception as e:
+ # raise
+ errors.append((thread_name, repr(e)))
+
+ threads = [
+ threading.Thread(
+ name=f"thread-{i}",
+ target=thread_func,
+ args=thread_args,
+ kwargs=thread_kwargs,
+ )
+ for i in range(nthreads)
+ ]
+
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+
+ return results, errors
+
+ @testing.fixture
+ def num_threads_engine(self, testing_engine):
+ return testing_engine(options=dict(pool_size=NUM_THREADS))
+
+
+@testing.add_to_marker.timing_intensive
+class EngineThreadSafetyTest(_ThreadTest, fixtures.TablesTest):
+ run_dispose_bind = "once"
+
+ __requires__ = ("multithreading_support",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "test_table",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("thread_id", Integer),
+ Column("data", String(50)),
+ )
+
+ @testing.combinations(
+ (NUM_THREADS, 0),
+ (3, 5),
+ (3, 0),
+ (7, 0),
+ argnames="pool_size, max_overflow",
+ )
+ def test_engine_thread_safe(self, testing_engine, pool_size, max_overflow):
+ """Test that a single Engine can be safely shared across threads."""
+ test_table = self.tables.test_table
+
+ engine = testing_engine(
+ options=dict(pool_size=pool_size, max_overflow=max_overflow)
+ )
+
+ def worker(results, thread_name):
+ for _ in range(ITERATIONS):
+ with engine.connect() as conn:
+ conn.execute(
+ test_table.insert(),
+ {"data": thread_name},
+ )
+ conn.commit()
+
+ result = conn.execute(
+ sa.select(test_table.c.data).where(
+ test_table.c.data == thread_name
+ )
+ ).scalar()
+ results.append(result)
+
+ results, errors = self.run_threaded(worker)
+
+ eq_(errors, [])
+ eq_(
+ set(results),
+ {
+ tuple([f"thread-{i}" for j in range(ITERATIONS)])
+ for i in range(NUM_THREADS)
+ },
+ )
+
+ def test_metadata_thread_safe(self, num_threads_engine):
+ """Test that MetaData objects are thread-safe for reads."""
+ metadata = sa.MetaData()
+
+ for thread_id in range(NUM_THREADS):
+ Table(
+ f"thread-{thread_id}",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
+
+ metadata.create_all(testing.db)
+
+ def worker(results, thread_name):
+ table_key = thread_name
+ assert table_key in metadata.tables, f"{table_key} does not exist"
+ with num_threads_engine.connect() as conn:
+ # Will raise if it cannot connect so erros will be populated
+ conn.execute(sa.select(metadata.tables[table_key]))
+
+ _, errors = self.run_threaded(worker)
+ eq_(errors, [])
+
+
+@testing.add_to_marker.timing_intensive
+class SessionThreadingTest(_ThreadTest, fixtures.MappedTest):
+ run_dispose_bind = "once"
+
+ __requires__ = ("multithreading_support",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("name", String(50)),
+ Column("thread_id", String(50)),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class User(cls.Comparable):
+ pass
+
+ def test_sessionmaker_thread_safe(self, num_threads_engine):
+ """Test that sessionmaker factory is thread-safe."""
+ users, User = self.tables.users, self.classes.User
+ self.mapper_registry.map_imperatively(User, users)
+
+ # Single sessionmaker shared across threads
+ SessionFactory = sessionmaker(num_threads_engine)
+
+ def worker(results, thread_name):
+ thread_id = thread_name
+
+ for _ in range(ITERATIONS):
+ with SessionFactory() as session:
+ for i in range(3):
+ user = User(
+ name=f"user_{thread_id}_{i}", thread_id=thread_id
+ )
+ session.add(user)
+ session.commit()
+
+ count = (
+ session.query(User)
+ .filter_by(thread_id=thread_id)
+ .count()
+ )
+ results.append(count)
+
+ results, errors = self.run_threaded(worker)
+
+ eq_(errors, [])
+ eq_(
+ results,
+ [
+ tuple(range(3, 3 * ITERATIONS + 3, 3))
+ for _ in range(NUM_THREADS)
+ ],
+ )
+
+ def test_scoped_session_thread_local(self, num_threads_engine):
+ """Test that scoped_session provides thread-local sessions."""
+ users, User = self.tables.users, self.classes.User
+ self.mapper_registry.map_imperatively(User, users)
+
+ # Create scoped session
+ Session = scoped_session(sessionmaker(num_threads_engine))
+
+ session_ids = {}
+
+ def worker(results, thread_name):
+ thread_id = thread_name
+
+ session = Session()
+ session_ids[thread_id] = id(session)
+ session.close()
+
+ for _ in range(ITERATIONS):
+ user = User(
+ name=f"scoped_user_{thread_id}", thread_id=thread_id
+ )
+ Session.add(user)
+ Session.commit()
+
+ session2 = Session()
+ assert id(session2) == session_ids[thread_id]
+ session2.close()
+
+ count = (
+ Session.query(User).filter_by(thread_id=thread_id).count()
+ )
+ results.append(count)
+ Session.remove()
+
+ results, errors = self.run_threaded(worker)
+
+ eq_(errors, [])
+ unique_sessions = set(session_ids.values())
+ eq_(len(unique_sessions), NUM_THREADS)
+ eq_(
+ results,
+ [tuple(range(1, ITERATIONS + 1)) for _ in range(NUM_THREADS)],
+ )
+
+
+@testing.add_to_marker.timing_intensive
+class FromClauseConcurrencyTest(_ThreadTest, fixtures.TestBase):
+ """test for issue #12302"""
+
+ @testing.variation("collection", ["c", "primary_key", "foreign_keys"])
+ def test_c_collection(self, collection):
+ dictionary_meta = MetaData()
+ all_indexes_table = Table(
+ "all_indexes",
+ dictionary_meta,
+ *[Column(f"col{i}", Integer) for i in range(50)],
+ )
+
+ def use_table(results, errors):
+ for i in range(3):
+ time.sleep(random.random() * 0.0001)
+ if collection.c:
+ all_indexes.c.col35
+ elif collection.primary_key:
+ all_indexes.primary_key
+ elif collection.foreign_keys:
+ all_indexes.foreign_keys
+
+ for j in range(1000):
+ all_indexes = all_indexes_table.alias("a_indexes")
+
+ results, errors = self.run_threaded(
+ use_table, use_barrier=False, nthreads=5
+ )
+
+ eq_(errors, [])
+ eq_(len(results), 5)
self.Target = Target
+ @testing.requires.predictable_gc
def test_subclass(self):
class SubTarget(self.Target):
pass
def test_orm_quickstart(self):
self._run_doctest("orm/quickstart.rst")
+ # this crashes on 3.13t but passes on 3.14t.
+ # just requiring non-freethreaded for now
+ @requires.gil_enabled
@requires.greenlet
def test_asyncio(self):
try:
evt.connect()
def checkout():
+ barrier.wait()
for j in range(2):
c1 = pool.connect()
time.sleep(0.02)
# any of the connections get returned. so first_connect()
# sleeps for one second, then pings the mock. the threads should
# not have made it to the "checkout() event for that one second.
+ barrier = threading.Barrier(5)
for i in range(5):
th = threading.Thread(target=checkout)
th.start()
timeouts = []
def checkout():
+ barrier.wait()
for x in range(1):
now = time.time()
try:
time.sleep(4)
c1.close()
+ barrier = threading.Barrier(10)
threads = []
for i in range(10):
th = threading.Thread(target=checkout)
# /home/classic/dev/sqlalchemy/test/profiles.txt
# This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and
+# For each test in aaa_profiling, the corresponding function and
# environment is located within this file. If it doesn't exist,
# the test is skipped.
-# If a callcount does exist, it is compared to what we received.
+# If a callcount does exist, it is compared to what we received.
# assertions are raised if the counts do not match.
-#
-# To add a new callcount test, apply the function_call_count
-# decorator and re-run the tests using the --write-profiles
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles
# option - this file will be rewritten including the new count.
-#
+#
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 78
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 78
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 79
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 79
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 73
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 73
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect
gc.collect() is called, as well as clean out unreferenced subclasses.
"""
- return self.cpython + self.only_linux + skip_if("+aiosqlite")
+ return self.cpython + self.gil_enabled + skip_if("+aiosqlite")
+
+ @property
+ def multithreading_support(self):
+ """target platform allows use of threads without any problem"""
+
+ return skip_if("+aiosqlite") + skip_if(self.sqlite_memory)
@property
def memory_process_intensive(self):
"""Test various algorithmic properties of selectables."""
from itertools import zip_longest
-import random
-import threading
-import time
from sqlalchemy import and_
from sqlalchemy import bindparam
a3 = a2._clone()
a3._copy_internals()
is_(a1.corresponding_column(a3.c.c), a1.c.c)
-
-
-class FromClauseConcurrencyTest(fixtures.TestBase):
- """test for issue 12302"""
-
- @testing.requires.timing_intensive
- def test_c_collection(self):
- dictionary_meta = MetaData()
- all_indexes_table = Table(
- "all_indexes",
- dictionary_meta,
- *[Column(f"col{i}", Integer) for i in range(50)],
- )
-
- fails = 0
-
- def use_table():
- nonlocal fails
- try:
- for i in range(3):
- time.sleep(random.random() * 0.0001)
- all_indexes.c.col35
- except:
- fails += 1
- raise
-
- for j in range(1000):
- all_indexes = all_indexes_table.alias("a_indexes")
-
- threads = [threading.Thread(target=use_table) for i in range(5)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
-
- assert not fails, "one or more runs failed"
asyncio
sqlite: aiosqlite
sqlite_file: aiosqlite
- postgresql: postgresql_asyncpg
+
+ # asyncpg doesnt build on free threading backends
+ py{38,39,310,311,312,313,314}-postgresql: postgresql_asyncpg
+
mysql: asyncmy
mysql: aiomysql
- mssql: aioodbc
+
+ # aioodbc builds on free threading but segfaults once you use it
+ py{38,39,310,311,312,313,314}-mssql: aioodbc
[testenv]
cov_args=--cov=sqlalchemy --cov-report term --cov-append --cov-report xml --exclude-tag memory-intensive --exclude-tag timing-intensive -k "not aaa_profiling"
# this can be limited to specific python versions IF there is no
# greenlet available for the most recent python. otherwise
# keep this present in all cases
- py{38,39,310,311,312,313,314}: {[greenletextras]extras}
+ # py{38,39,310,311,312,313,314,313t,314t}: {[greenletextras]extras}
+
+ {[greenletextras]extras}
postgresql: postgresql
postgresql: postgresql_pg8000
WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5}
+ {py313t,py314t}: PYTHON_GIL=0 # when running with 313t, 314t, go for broke
+
# affect setup.py to skip or build the cython extensions.
nocext: DISABLE_SQLALCHEMY_CEXT=1
cext: REQUIRE_SQLALCHEMY_CEXT=1
oracle: WORKERS={env:TOX_WORKERS:-n2 --max-worker-restart=5}
oracle: ORACLE={env:TOX_ORACLE:--db oracle}
oracle: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb --dbdriver oracledb_async}
+ oracle-nogreenlet: EXTRA_ORACLE_DRIVERS={env:EXTRA_ORACLE_DRIVERS:--dbdriver cx_oracle --dbdriver oracledb}
sqlite: SQLITE={env:TOX_SQLITE:--db sqlite}
- sqlite-py{38,39,310,311,312,313}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite}
- sqlite-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric}
+ sqlite-py{38,39,310,311,312,313,314}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric --dbdriver aiosqlite}
+ sqlite-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver pysqlite_numeric}
sqlite_file: SQLITE={env:TOX_SQLITE_FILE:--db sqlite_file}
sqlite_file: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite --dbdriver aiosqlite}
- sqlite_file-{py314,nogreenlet}: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite}
+ sqlite_file-nogreenlet: EXTRA_SQLITE_DRIVERS={env:EXTRA_SQLITE_DRIVERS:--dbdriver sqlite}
postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql}
postgresql: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver asyncpg --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async}
postgresql-nogreenlet: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg}
+ postgresql-{py313t,py314t}: EXTRA_PG_DRIVERS={env:EXTRA_PG_DRIVERS:--dbdriver psycopg2 --dbdriver pg8000 --dbdriver psycopg --dbdriver psycopg_async}
mysql: MYSQL={env:TOX_MYSQL:--db mysql}
mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver asyncmy --dbdriver aiomysql --dbdriver mariadbconnector}
mysql-nogreenlet: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector}
+ # for mssql, aioodbc frequently segfaults under free-threaded builds
mssql: MSSQL={env:TOX_MSSQL:--db mssql}
mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc --dbdriver pymssql}
mssql-py314: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver aioodbc}
- mssql-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql}
- mssql-py314-nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc}
+ mssql-{py313t,py314t,nogreenlet: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql}
oracle,mssql,sqlite_file: IDENTS=--write-idents db_idents.txt
EXTRA_PG_DRIVERS
EXTRA_MYSQL_DRIVERS
EXTRA_ORACLE_DRIVERS
+ PYTHON_GIL
commands=
# command run in the github action when cext are active.
-[testenv:github-cext]
+# unfortunately it seems impossible to use generative tags with envs
+# that are not the default. so in the interim, build out three separate
+# envs with explicit names
+
+[testenv:githubbase]
extras=
- {[greenletextras]extras}
+setenv=
+ PYTHONNOUSERSITE=1
+
+ WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5}
+ PYTEST_COLOR={tty:--color=yes}
+ PYTEST_EXCLUDES=-m "not memory_intensive and not mypy and not timing_intensive"
+ SQLITE=--db sqlite
-deps = {[testenv]deps}
- .[aiosqlite]
commands=
- python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs}
+ python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs}
oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt
-# command run in the github action when cext are not active.
-[testenv:github-nocext]
+[testenv:github-cext-greenlet]
extras=
- {[greenletextras]extras}
+ asyncio
+ aiosqlite
+setenv=
+ {[testenv:githubbase]setenv}
+ REQUIRE_SQLALCHEMY_CEXT=1
+ SQLITE=--db sqlite --db aiosqlite
+commands=
+ python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs}
+ oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt
+
+[testenv:github-cext]
+extras=
+setenv=
+ {[testenv:githubbase]setenv}
+ REQUIRE_SQLALCHEMY_CEXT=1
+deps = {[testenv:githubbase]deps}
+commands=
+ python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs}
+ oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt
-deps = {[testenv]deps}
- .[aiosqlite]
+
+[testenv:github-nocext]
+extras=
+setenv=
+ {[testenv:githubbase]setenv}
+ DISABLE_SQLALCHEMY_CEXT=1
+deps = {[testenv:githubbase]deps}
commands=
- python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs}
+ python -m pytest {env:PYTEST_COLOR} {env:WORKERS} {env:PYTEST_EXCLUDES:} {env:SQLITE} {posargs}
oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt
+