if rec is None:
if cache_key is not _cache_key.NO_CACHE:
- rec = AnalyzedFunction(
- tracker, self, apply_propagate_attrs, fn
- )
- rec.closure_bindparams = bindparams
- lambda_cache[tracker_key + cache_key] = rec
+ with AnalyzedCode._generation_mutex:
+ key = tracker_key + cache_key
+ if key not in lambda_cache:
+ rec = AnalyzedFunction(
+ tracker, self, apply_propagate_attrs, fn
+ )
+ rec.closure_bindparams = bindparams
+ lambda_cache[key] = rec
+ else:
+ rec = lambda_cache[key]
else:
rec = NonAnalyzedFunction(self._invoke_user_fn(fn))
Target must support simultaneous, independent database connections.
"""
- # This is also true of some configurations of UnixODBC and probably
- # win32 ODBC as well.
+ # note: **do not** let any sqlite driver run "independent connection"
+ # tests. Use independent_readonly_connections for a concurrency
+ # related test that only uses reads to use sqlite
+ return skip_if(["sqlite"])
+
+ @property
+ def independent_readonly_connections(self):
+ """
+ Target must support simultaneous, independent database connections
+ that will be used in a readonly fashion.
+
+ """
return skip_if(
[
- no_support(
- "sqlite",
- "independent connections disabled "
- "when :memory: connections are used",
- ),
- exclude(
- "mssql",
- "<",
- (9, 0, 0),
- "SQL Server 2005+ is required for "
- "independent connections",
- ),
+ self._sqlite_memory_db,
+ "+aiosqlite",
]
)
+from __future__ import annotations
+
+import threading
+import time
+from typing import List
+from typing import Optional
+
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.future import select as future_select
eq_(e12key[0], e1key[0])
eq_(e32key[0], e3key[0])
+
+
+class ConcurrencyTest(fixtures.TestBase):
+ """test for #8098 and #9461"""
+
+ __requires__ = ("independent_readonly_connections",)
+
+ __only_on__ = ("+psycopg2", "+mysqldb", "+pysqlite", "+pymysql")
+
+ THREADS = 10
+
+ @testing.fixture
+ def mapping_fixture(self, decl_base):
+ class A(decl_base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ col1 = Column(String(100))
+ col2 = Column(String(100))
+ col3 = Column(String(100))
+ col4 = Column(String(100))
+
+ decl_base.metadata.create_all(testing.db)
+
+ from sqlalchemy.orm import Session
+
+ with testing.db.connect() as conn:
+ with Session(conn) as session:
+ session.add_all(
+ [
+ A(col1=str(i), col2=str(i), col3=str(i), col4=str(i))
+ for i in range(self.THREADS + 1)
+ ]
+ )
+ session.commit()
+
+ return A
+
+ @testing.requires.timing_intensive
+ def test_lambda_concurrency(self, testing_engine, mapping_fixture):
+ A = mapping_fixture
+ engine = testing_engine(options={"pool_size": self.THREADS + 5})
+ NUM_OF_LAMBDAS = 150
+
+ code = """
+from sqlalchemy import lambda_stmt, select
+
+
+def generate_lambda_stmt(wanted):
+ stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4))
+"""
+
+ for _ in range(NUM_OF_LAMBDAS):
+ code += (
+ " stmt += lambda s: s.where((A.col1 == wanted) & "
+ "(A.col2 == wanted) & (A.col3 == wanted) & "
+ "(A.col4 == wanted))\n"
+ )
+
+ code += """
+ return stmt
+"""
+
+ d = {"A": A, "__name__": "lambda_fake"}
+ exec(code, d)
+ generate_lambda_stmt = d["generate_lambda_stmt"]
+
+ runs: List[Optional[int]] = [None for _ in range(self.THREADS)]
+ conns = [engine.connect() for _ in range(self.THREADS)]
+
+ def run(num):
+ wanted = str(num)
+ connection = conns[num]
+ time.sleep(0.1)
+ stmt = generate_lambda_stmt(wanted)
+ time.sleep(0.1)
+ row = connection.execute(stmt).first()
+ if not row:
+ runs[num] = False
+ else:
+ runs[num] = True
+
+ threads = [
+ threading.Thread(target=run, args=(num,))
+ for num in range(self.THREADS)
+ ]
+
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join(timeout=10)
+ for conn in conns:
+ conn.close()
+
+ fails = len([r for r in runs if r is False])
+ assert not fails, f"{fails} runs failed"