From: Mike Bayer Date: Wed, 21 Apr 2021 18:44:45 +0000 (-0400) Subject: Add new "sync once" mode for pool.connect X-Git-Tag: rel_1_4_11~3^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=37414a752b0036334d0f31ac8cd3aff749c3898b;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add new "sync once" mode for pool.connect Fixed critical regression caused by the change in :ticket`5497` where the connection pool "init" phase no longer occurred within mutexed isolation, allowing other threads to proceed with the dialect uninitialized, which could then impact the compilation of SQL statements. This issue is essentially the same regression which was fixed many years ago in :ticket:`2964` in dd32540dabbee0678530fb1b0868d1eb41572dca, which was missed this time as the test suite fo that issue only tested the pool in isolation, and assumed the "first_connect" event would be used by the Engine. However :ticket:`5497` stopped using "first_connect" and no test detected the lack of mutexing, that has been resolved here through the addition of more tests. This fix also identifies what is probably a bug in earlier versions of SQLAlchemy where the "first_connect" handler would be cancelled if the initializer failed; this is evidenced by test_explode_in_initializer which was doing a reconnect due to c.rollback() yet wasn't hanging. We now solve this issue by preventing the manufactured Connection from ever reconnecting inside the first_connect handler. Also remove the "_sqla_unwrap" test attribute; this is almost not used anymore however we can use a more targeted wrapper supplied by the testing.engines.proxying_engine function. See if we can also open up Oracle for "ad hoc engines" tests now that we have better connection management logic. Fixes: #6337 Change-Id: I4a3476625c4606f1a304dbc940d500325e8adc1a --- diff --git a/doc/build/changelog/unreleased_14/6337.rst b/doc/build/changelog/unreleased_14/6337.rst new file mode 100644 index 0000000000..292850f428 --- /dev/null +++ b/doc/build/changelog/unreleased_14/6337.rst @@ -0,0 +1,9 @@ +.. change:: + :tags: bug, engine, regression + :tickets: 6337 + + Fixed critical regression caused by the change in :ticket`5497` where the + connection pool "init" phase no longer occurred within mutexed isolation, + allowing other threads to proceed with the dialect uninitialized, which + could then impact the compilation of SQL statements. + diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 20d11e578e..e0ebb4a989 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -69,6 +69,7 @@ class Connection(Connectable): _execution_options=None, _dispatch=None, _has_events=None, + _allow_revalidate=True, ): """Construct a new Connection.""" self.engine = engine @@ -96,7 +97,7 @@ class Connection(Connectable): self.__in_begin = False self.should_close_with_result = close_with_result - self.__can_reconnect = True + self.__can_reconnect = _allow_revalidate self._echo = self.engine._should_log_info() if _has_events is None: diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index 682d0dd5d3..0351f2ebcc 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -626,7 +626,10 @@ def create_engine(url, **kwargs): if k in kwargs: engine_args[k] = pop_kwarg(k) + # internal flags used by the test suite for instrumenting / proxying + # engines with mocks etc. _initialize = kwargs.pop("_initialize", True) + _wrap_do_on_connect = kwargs.pop("_wrap_do_on_connect", None) # all kwargs should be consumed if kwargs: @@ -646,30 +649,38 @@ def create_engine(url, **kwargs): engine = engineclass(pool, dialect, u, **engine_args) if _initialize: + do_on_connect = dialect.on_connect_url(url) if do_on_connect: + if _wrap_do_on_connect: + do_on_connect = _wrap_do_on_connect(do_on_connect) def on_connect(dbapi_connection, connection_record): - conn = getattr( - dbapi_connection, "_sqla_unwrap", dbapi_connection - ) - if conn is None: - return - - do_on_connect(conn) + do_on_connect(dbapi_connection) event.listen(pool, "connect", on_connect) def first_connect(dbapi_connection, connection_record): c = base.Connection( - engine, connection=dbapi_connection, _has_events=False + engine, + connection=dbapi_connection, + _has_events=False, + # reconnecting will be a reentrant condition, so if the + # connection goes away, Connection is then closed + _allow_revalidate=False, ) c._execution_options = util.EMPTY_DICT try: dialect.initialize(c) finally: - dialect.do_rollback(c.connection) + # note that "invalidated" and "closed" are mutually + # exclusive in 1.4 Connection. + if not c.invalidated and not c.closed: + # transaction is rolled back otherwise, tested by + # test/dialect/postgresql/test_dialect.py + # ::MiscBackendTest::test_initial_transaction_state + dialect.do_rollback(c.connection) # previously, the "first_connect" event was used here, which was then # scaled back if the "on_connect" handler were present. now, diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index 245eaab60a..e428d2635f 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -268,7 +268,7 @@ class _EmptyListener(_InstanceLevelDispatch): class _CompoundListener(_InstanceLevelDispatch): - __slots__ = "_exec_once_mutex", "_exec_once" + __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once" def _set_asyncio(self): self._exec_once_mutex = AsyncAdaptedLock() @@ -311,6 +311,29 @@ class _CompoundListener(_InstanceLevelDispatch): if not self._exec_once: self._exec_once_impl(True, *args, **kw) + def _exec_w_sync_on_first_run(self, *args, **kw): + """Execute this event, and use a mutex if it has not been + executed already for this collection, or was called + by a previous _exec_w_sync_on_first_run call and + raised an exception. + + If _exec_w_sync_on_first_run was already called and didn't raise an + exception, then a mutex is not used. + + .. versionadded:: 1.4.11 + + """ + if not self._exec_w_sync_once: + with self._exec_once_mutex: + try: + self(*args, **kw) + except: + raise + else: + self._exec_w_sync_once = True + else: + self(*args, **kw) + def __call__(self, *args, **kw): """Execute this event.""" @@ -354,6 +377,7 @@ class _ListenerCollection(_CompoundListener): if target_cls not in parent._clslevel: parent.update_subclass(target_cls) self._exec_once = False + self._exec_w_sync_once = False self.parent_listeners = parent._clslevel[target_cls] self.parent = parent self.name = parent.name diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index d14316fdbe..e2ed538003 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -359,7 +359,7 @@ class _ConnectionRecord(object): def __init__(self, pool, connect=True): self.__pool = pool if connect: - self.__connect(first_connect_check=True) + self.__connect() self.finalize_callback = deque() fresh = False @@ -588,7 +588,7 @@ class _ConnectionRecord(object): self.__pool._close_connection(self.connection) self.connection = None - def __connect(self, first_connect_check=False): + def __connect(self): pool = self.__pool # ensure any existing connection is removed, so that if @@ -604,12 +604,18 @@ class _ConnectionRecord(object): with util.safe_reraise(): pool.logger.debug("Error on connect(): %s", e) else: - if first_connect_check: + # in SQLAlchemy 1.4 the first_connect event is not used by + # the engine, so this will usually not be set + if pool.dispatch.first_connect: pool.dispatch.first_connect.for_modify( pool.dispatch ).exec_once_unless_exception(self.connection, self) - if pool.dispatch.connect: - pool.dispatch.connect(self.connection, self) + + # init of the dialect now takes place within the connect + # event, so ensure a mutex is used on the first run + pool.dispatch.connect.for_modify( + pool.dispatch + )._exec_w_sync_on_first_run(self.connection, self) def _finalize_fairy( diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 3faf968575..1d740b4f3b 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -405,7 +405,7 @@ class DBAPIProxyConnection(object): """ def __init__(self, engine, cursor_cls): - self.conn = self._sqla_unwrap = engine.pool._creator() + self.conn = engine.pool._creator() self.engine = engine self.cursor_cls = cursor_cls @@ -430,4 +430,15 @@ def proxying_engine( def mock_conn(): return conn_cls(config.db, cursor_cls) - return testing_engine(options={"creator": mock_conn}) + def _wrap_do_on_connect(do_on_connect): + def go(dbapi_conn): + return do_on_connect(dbapi_conn.conn) + + return go + + return testing_engine( + options={ + "creator": mock_conn, + "_wrap_do_on_connect": _wrap_do_on_connect, + } + ) diff --git a/test/aaa_profiling/test_pool.py b/test/aaa_profiling/test_pool.py index da3c1c5256..2b1a490c24 100644 --- a/test/aaa_profiling/test_pool.py +++ b/test/aaa_profiling/test_pool.py @@ -1,9 +1,9 @@ +from sqlalchemy import event from sqlalchemy.pool import QueuePool from sqlalchemy.testing import AssertsExecutionResults from sqlalchemy.testing import fixtures from sqlalchemy.testing import profiling - pool = None @@ -28,6 +28,11 @@ class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults): global pool pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1) + # make this a real world case where we have a "connect" handler + @event.listens_for(pool, "connect") + def do_connect(dbapi_conn, conn_record): + pass + @profiling.function_call_count() def test_first_connect(self): pool.connect() diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 0b65b3055c..f4449383a9 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -2,6 +2,7 @@ from contextlib import contextmanager import re +import threading import weakref import sqlalchemy as tsa @@ -26,6 +27,7 @@ from sqlalchemy import VARCHAR from sqlalchemy.engine import default from sqlalchemy.engine.base import Connection from sqlalchemy.engine.base import Engine +from sqlalchemy.pool import NullPool from sqlalchemy.pool import QueuePool from sqlalchemy.sql import column from sqlalchemy.sql import literal @@ -2812,7 +2814,7 @@ class HandleErrorTest(fixtures.TestBase): conn.close() -class HandleInvalidatedOnConnectTest(fixtures.TestBase): +class OnConnectTest(fixtures.TestBase): __requires__ = ("sqlite",) def setup_test(self): @@ -3090,6 +3092,63 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase): c.close() c2.close() + @testing.only_on("sqlite+pysqlite") + def test_initialize_connect_race(self): + """test for :ticket:`6337` fixing the regression in :ticket:`5497`, + dialect init is mutexed""" + + m1 = [] + cls_ = testing.db.dialect.__class__ + + class SomeDialect(cls_): + def initialize(self, connection): + super(SomeDialect, self).initialize(connection) + m1.append("initialize") + + def on_connect(self): + oc = super(SomeDialect, self).on_connect() + + def my_on_connect(conn): + if oc: + oc(conn) + m1.append("on_connect") + + return my_on_connect + + u1 = Mock( + username=None, + password=None, + host=None, + port=None, + query={}, + database=None, + _instantiate_plugins=lambda kw: (u1, [], kw), + _get_entrypoint=Mock( + return_value=Mock(get_dialect_cls=lambda u: SomeDialect) + ), + ) + + for j in range(5): + m1[:] = [] + eng = create_engine( + u1, + poolclass=NullPool, + connect_args={"check_same_thread": False}, + ) + + def go(): + c = eng.connect() + c.execute(text("select 1")) + c.close() + + threads = [threading.Thread(target=go) for i in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + eq_(m1, ["on_connect", "initialize"] + ["on_connect"] * 9) + class DialectEventTest(fixtures.TestBase): @contextmanager diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 1c38805047..5b6dcfa45c 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -780,7 +780,22 @@ class PoolEventsTest(PoolTestBase): class PoolFirstConnectSyncTest(PoolTestBase): - # test [ticket:2964] + """test for :ticket:`2964`, where the pool would not mutex the + initialization of the dialect. + + Unfortunately, as discussed in :ticket:`6337`, this test suite did not + ensure that the ``Engine`` itself actually uses the "first_connect" event, + so when :ticket:`5497` came along, the "first_connect" event was no longer + used and no test detected the re-introduction of the exact same race + condition, which was now worse as the un-initialized dialect would now + pollute the SQL cache causing the application to not work at all. + + A new suite has therefore been added in test/engine/test_execute.py-> + OnConnectTest::test_initialize_connect_race to ensure that the engine + in total synchronizes the "first_connect" process, which now works + using a new events feature _exec_w_sync_on_first_run. + + """ @testing.requires.timing_intensive def test_sync(self): diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 04cf5440cf..70c4c51906 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -1144,9 +1144,6 @@ class RealReconnectTest(fixtures.TestBase): conn.invalidate() conn.invalidate() - @testing.skip_if( - [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle" - ) def test_explode_in_initializer(self): engine = engines.testing_engine() @@ -1158,9 +1155,6 @@ class RealReconnectTest(fixtures.TestBase): # raises a DBAPIError, not an AttributeError assert_raises(exc.DBAPIError, engine.connect) - @testing.skip_if( - [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle" - ) def test_explode_in_initializer_disconnect(self): engine = engines.testing_engine() diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index d972bcb73a..4751963e4f 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -626,7 +626,11 @@ class AutoRollbackTest(fixtures.TestBase): class IsolationLevelTest(fixtures.TestBase): - __requires__ = ("isolation_level", "ad_hoc_engines") + __requires__ = ( + "isolation_level", + "ad_hoc_engines", + "legacy_isolation_level", + ) __backend__ = True def _default_isolation_level(self): diff --git a/test/profiles.txt b/test/profiles.txt index f1dab165e0..1a271596d1 100644 --- a/test/profiles.txt +++ b/test/profiles.txt @@ -1,15 +1,15 @@ # /home/classic/dev/sqlalchemy/test/profiles.txt # This file is written out on a per-environment basis. -# For each test in aaa_profiling, the corresponding function and +# For each test in aaa_profiling, the corresponding function and # environment is located within this file. If it doesn't exist, # the test is skipped. -# If a callcount does exist, it is compared to what we received. +# If a callcount does exist, it is compared to what we received. # assertions are raised if the counts do not match. -# -# To add a new callcount test, apply the function_call_count -# decorator and re-run the tests using the --write-profiles +# +# To add a new callcount test, apply the function_call_count +# decorator and re-run the tests using the --write-profiles # option - this file will be rewritten including the new count. -# +# # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert @@ -498,12 +498,12 @@ test.aaa_profiling.test_orm.SessionTest.test_expire_lots x86_64_linux_cpython_3. # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 102 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 102 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 84 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 84 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 86 -test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 86 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 90 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 90 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 72 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 74 +test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 74 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect diff --git a/test/requirements.py b/test/requirements.py index 29dd55b450..98dca61249 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -407,6 +407,17 @@ class DefaultRequirements(SuiteRequirements): "pypostgresql bombs on multiple isolation level calls", ) + @property + def legacy_isolation_level(self): + # refers to the engine isolation_level setting + return only_on( + ("postgresql", "sqlite", "mysql", "mariadb", "mssql"), + "DBAPI has no isolation level support", + ) + fails_on( + "postgresql+pypostgresql", + "pypostgresql bombs on multiple isolation level calls", + ) + def get_isolation_levels(self, config): levels = set(config.db.dialect._isolation_lookup) @@ -1458,14 +1469,7 @@ class DefaultRequirements(SuiteRequirements): @property def ad_hoc_engines(self): - return ( - exclusions.skip_if( - ["oracle"], - "works, but Oracle just gets tired with " - "this much connection activity", - ) - + skip_if(self._sqlite_file_db) - ) + return skip_if(self._sqlite_file_db) @property def no_asyncio(self):