Fixed critical regression caused by the change in :ticket`5497` where the
connection pool "init" phase no longer occurred within mutexed isolation,
allowing other threads to proceed with the dialect uninitialized, which
could then impact the compilation of SQL statements.
This issue is essentially the same regression which was fixed many years
ago in :ticket:`2964` in
dd32540dabbee0678530fb1b0868d1eb41572dca,
which was missed this time as the test suite fo
that issue only tested the pool in isolation, and assumed the
"first_connect" event would be used by the Engine. However
:ticket:`5497` stopped using "first_connect" and no test detected
the lack of mutexing, that has been resolved here through
the addition of more tests.
This fix also identifies what is probably a bug in earlier versions
of SQLAlchemy where the "first_connect" handler would be cancelled
if the initializer failed; this is evidenced by
test_explode_in_initializer which was doing a reconnect due to
c.rollback() yet wasn't hanging. We now solve this issue by
preventing the manufactured Connection from ever reconnecting
inside the first_connect handler.
Also remove the "_sqla_unwrap" test attribute; this is almost
not used anymore however we can use a more targeted
wrapper supplied by the testing.engines.proxying_engine
function.
See if we can also open up Oracle for "ad hoc engines" tests
now that we have better connection management logic.
Fixes: #6337
Change-Id: I4a3476625c4606f1a304dbc940d500325e8adc1a
--- /dev/null
+.. change::
+ :tags: bug, engine, regression
+ :tickets: 6337
+
+ Fixed critical regression caused by the change in :ticket`5497` where the
+ connection pool "init" phase no longer occurred within mutexed isolation,
+ allowing other threads to proceed with the dialect uninitialized, which
+ could then impact the compilation of SQL statements.
+
_execution_options=None,
_dispatch=None,
_has_events=None,
+ _allow_revalidate=True,
):
"""Construct a new Connection."""
self.engine = engine
self.__in_begin = False
self.should_close_with_result = close_with_result
- self.__can_reconnect = True
+ self.__can_reconnect = _allow_revalidate
self._echo = self.engine._should_log_info()
if _has_events is None:
if k in kwargs:
engine_args[k] = pop_kwarg(k)
+ # internal flags used by the test suite for instrumenting / proxying
+ # engines with mocks etc.
_initialize = kwargs.pop("_initialize", True)
+ _wrap_do_on_connect = kwargs.pop("_wrap_do_on_connect", None)
# all kwargs should be consumed
if kwargs:
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
+
do_on_connect = dialect.on_connect_url(url)
if do_on_connect:
+ if _wrap_do_on_connect:
+ do_on_connect = _wrap_do_on_connect(do_on_connect)
def on_connect(dbapi_connection, connection_record):
- conn = getattr(
- dbapi_connection, "_sqla_unwrap", dbapi_connection
- )
- if conn is None:
- return
-
- do_on_connect(conn)
+ do_on_connect(dbapi_connection)
event.listen(pool, "connect", on_connect)
def first_connect(dbapi_connection, connection_record):
c = base.Connection(
- engine, connection=dbapi_connection, _has_events=False
+ engine,
+ connection=dbapi_connection,
+ _has_events=False,
+ # reconnecting will be a reentrant condition, so if the
+ # connection goes away, Connection is then closed
+ _allow_revalidate=False,
)
c._execution_options = util.EMPTY_DICT
try:
dialect.initialize(c)
finally:
- dialect.do_rollback(c.connection)
+ # note that "invalidated" and "closed" are mutually
+ # exclusive in 1.4 Connection.
+ if not c.invalidated and not c.closed:
+ # transaction is rolled back otherwise, tested by
+ # test/dialect/postgresql/test_dialect.py
+ # ::MiscBackendTest::test_initial_transaction_state
+ dialect.do_rollback(c.connection)
# previously, the "first_connect" event was used here, which was then
# scaled back if the "on_connect" handler were present. now,
class _CompoundListener(_InstanceLevelDispatch):
- __slots__ = "_exec_once_mutex", "_exec_once"
+ __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once"
def _set_asyncio(self):
self._exec_once_mutex = AsyncAdaptedLock()
if not self._exec_once:
self._exec_once_impl(True, *args, **kw)
+ def _exec_w_sync_on_first_run(self, *args, **kw):
+ """Execute this event, and use a mutex if it has not been
+ executed already for this collection, or was called
+ by a previous _exec_w_sync_on_first_run call and
+ raised an exception.
+
+ If _exec_w_sync_on_first_run was already called and didn't raise an
+ exception, then a mutex is not used.
+
+ .. versionadded:: 1.4.11
+
+ """
+ if not self._exec_w_sync_once:
+ with self._exec_once_mutex:
+ try:
+ self(*args, **kw)
+ except:
+ raise
+ else:
+ self._exec_w_sync_once = True
+ else:
+ self(*args, **kw)
+
def __call__(self, *args, **kw):
"""Execute this event."""
if target_cls not in parent._clslevel:
parent.update_subclass(target_cls)
self._exec_once = False
+ self._exec_w_sync_once = False
self.parent_listeners = parent._clslevel[target_cls]
self.parent = parent
self.name = parent.name
def __init__(self, pool, connect=True):
self.__pool = pool
if connect:
- self.__connect(first_connect_check=True)
+ self.__connect()
self.finalize_callback = deque()
fresh = False
self.__pool._close_connection(self.connection)
self.connection = None
- def __connect(self, first_connect_check=False):
+ def __connect(self):
pool = self.__pool
# ensure any existing connection is removed, so that if
with util.safe_reraise():
pool.logger.debug("Error on connect(): %s", e)
else:
- if first_connect_check:
+ # in SQLAlchemy 1.4 the first_connect event is not used by
+ # the engine, so this will usually not be set
+ if pool.dispatch.first_connect:
pool.dispatch.first_connect.for_modify(
pool.dispatch
).exec_once_unless_exception(self.connection, self)
- if pool.dispatch.connect:
- pool.dispatch.connect(self.connection, self)
+
+ # init of the dialect now takes place within the connect
+ # event, so ensure a mutex is used on the first run
+ pool.dispatch.connect.for_modify(
+ pool.dispatch
+ )._exec_w_sync_on_first_run(self.connection, self)
def _finalize_fairy(
"""
def __init__(self, engine, cursor_cls):
- self.conn = self._sqla_unwrap = engine.pool._creator()
+ self.conn = engine.pool._creator()
self.engine = engine
self.cursor_cls = cursor_cls
def mock_conn():
return conn_cls(config.db, cursor_cls)
- return testing_engine(options={"creator": mock_conn})
+ def _wrap_do_on_connect(do_on_connect):
+ def go(dbapi_conn):
+ return do_on_connect(dbapi_conn.conn)
+
+ return go
+
+ return testing_engine(
+ options={
+ "creator": mock_conn,
+ "_wrap_do_on_connect": _wrap_do_on_connect,
+ }
+ )
+from sqlalchemy import event
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import profiling
-
pool = None
global pool
pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
+ # make this a real world case where we have a "connect" handler
+ @event.listens_for(pool, "connect")
+ def do_connect(dbapi_conn, conn_record):
+ pass
+
@profiling.function_call_count()
def test_first_connect(self):
pool.connect()
from contextlib import contextmanager
import re
+import threading
import weakref
import sqlalchemy as tsa
from sqlalchemy.engine import default
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
+from sqlalchemy.pool import NullPool
from sqlalchemy.pool import QueuePool
from sqlalchemy.sql import column
from sqlalchemy.sql import literal
conn.close()
-class HandleInvalidatedOnConnectTest(fixtures.TestBase):
+class OnConnectTest(fixtures.TestBase):
__requires__ = ("sqlite",)
def setup_test(self):
c.close()
c2.close()
+ @testing.only_on("sqlite+pysqlite")
+ def test_initialize_connect_race(self):
+ """test for :ticket:`6337` fixing the regression in :ticket:`5497`,
+ dialect init is mutexed"""
+
+ m1 = []
+ cls_ = testing.db.dialect.__class__
+
+ class SomeDialect(cls_):
+ def initialize(self, connection):
+ super(SomeDialect, self).initialize(connection)
+ m1.append("initialize")
+
+ def on_connect(self):
+ oc = super(SomeDialect, self).on_connect()
+
+ def my_on_connect(conn):
+ if oc:
+ oc(conn)
+ m1.append("on_connect")
+
+ return my_on_connect
+
+ u1 = Mock(
+ username=None,
+ password=None,
+ host=None,
+ port=None,
+ query={},
+ database=None,
+ _instantiate_plugins=lambda kw: (u1, [], kw),
+ _get_entrypoint=Mock(
+ return_value=Mock(get_dialect_cls=lambda u: SomeDialect)
+ ),
+ )
+
+ for j in range(5):
+ m1[:] = []
+ eng = create_engine(
+ u1,
+ poolclass=NullPool,
+ connect_args={"check_same_thread": False},
+ )
+
+ def go():
+ c = eng.connect()
+ c.execute(text("select 1"))
+ c.close()
+
+ threads = [threading.Thread(target=go) for i in range(10)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ eq_(m1, ["on_connect", "initialize"] + ["on_connect"] * 9)
+
class DialectEventTest(fixtures.TestBase):
@contextmanager
class PoolFirstConnectSyncTest(PoolTestBase):
- # test [ticket:2964]
+ """test for :ticket:`2964`, where the pool would not mutex the
+ initialization of the dialect.
+
+ Unfortunately, as discussed in :ticket:`6337`, this test suite did not
+ ensure that the ``Engine`` itself actually uses the "first_connect" event,
+ so when :ticket:`5497` came along, the "first_connect" event was no longer
+ used and no test detected the re-introduction of the exact same race
+ condition, which was now worse as the un-initialized dialect would now
+ pollute the SQL cache causing the application to not work at all.
+
+ A new suite has therefore been added in test/engine/test_execute.py->
+ OnConnectTest::test_initialize_connect_race to ensure that the engine
+ in total synchronizes the "first_connect" process, which now works
+ using a new events feature _exec_w_sync_on_first_run.
+
+ """
@testing.requires.timing_intensive
def test_sync(self):
conn.invalidate()
conn.invalidate()
- @testing.skip_if(
- [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle"
- )
def test_explode_in_initializer(self):
engine = engines.testing_engine()
# raises a DBAPIError, not an AttributeError
assert_raises(exc.DBAPIError, engine.connect)
- @testing.skip_if(
- [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle"
- )
def test_explode_in_initializer_disconnect(self):
engine = engines.testing_engine()
class IsolationLevelTest(fixtures.TestBase):
- __requires__ = ("isolation_level", "ad_hoc_engines")
+ __requires__ = (
+ "isolation_level",
+ "ad_hoc_engines",
+ "legacy_isolation_level",
+ )
__backend__ = True
def _default_isolation_level(self):
# /home/classic/dev/sqlalchemy/test/profiles.txt
# This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and
+# For each test in aaa_profiling, the corresponding function and
# environment is located within this file. If it doesn't exist,
# the test is skipped.
-# If a callcount does exist, it is compared to what we received.
+# If a callcount does exist, it is compared to what we received.
# assertions are raised if the counts do not match.
-#
-# To add a new callcount test, apply the function_call_count
-# decorator and re-run the tests using the --write-profiles
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles
# option - this file will be rewritten including the new count.
-#
+#
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 102
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 102
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 84
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 84
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 86
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 86
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 90
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 90
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 74
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect
"pypostgresql bombs on multiple isolation level calls",
)
+ @property
+ def legacy_isolation_level(self):
+ # refers to the engine isolation_level setting
+ return only_on(
+ ("postgresql", "sqlite", "mysql", "mariadb", "mssql"),
+ "DBAPI has no isolation level support",
+ ) + fails_on(
+ "postgresql+pypostgresql",
+ "pypostgresql bombs on multiple isolation level calls",
+ )
+
def get_isolation_levels(self, config):
levels = set(config.db.dialect._isolation_lookup)
@property
def ad_hoc_engines(self):
- return (
- exclusions.skip_if(
- ["oracle"],
- "works, but Oracle just gets tired with "
- "this much connection activity",
- )
- + skip_if(self._sqlite_file_db)
- )
+ return skip_if(self._sqlite_file_db)
@property
def no_asyncio(self):