]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add new "sync once" mode for pool.connect
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 21 Apr 2021 18:44:45 +0000 (14:44 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 22 Apr 2021 02:31:42 +0000 (22:31 -0400)
Fixed critical regression caused by the change in :ticket`5497` where the
connection pool "init" phase no longer occurred within mutexed isolation,
allowing other threads to proceed with the dialect uninitialized, which
could then impact the compilation of SQL statements.

This issue is essentially the same regression which was fixed many years
ago in :ticket:`2964` in dd32540dabbee0678530fb1b0868d1eb41572dca,
which was missed this time as the test suite fo
that issue only tested the pool in isolation, and assumed the
"first_connect" event would be used by the Engine.  However
:ticket:`5497` stopped using "first_connect" and no test detected
the lack of mutexing, that has been resolved here through
the addition of more tests.

This fix also identifies what is probably a bug in earlier versions
of SQLAlchemy where the "first_connect" handler would be cancelled
if the initializer failed; this is evidenced by
test_explode_in_initializer which was doing a reconnect due to
c.rollback() yet wasn't hanging.  We now solve this issue by
preventing the manufactured Connection from ever reconnecting
inside the first_connect handler.

Also remove the "_sqla_unwrap" test attribute; this is almost
not used anymore however we can use a more targeted
wrapper supplied by the testing.engines.proxying_engine
function.

See if we can also open up Oracle for "ad hoc engines" tests
now that we have better connection management logic.

Fixes: #6337
Change-Id: I4a3476625c4606f1a304dbc940d500325e8adc1a

13 files changed:
doc/build/changelog/unreleased_14/6337.rst [new file with mode: 0644]
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/create.py
lib/sqlalchemy/event/attr.py
lib/sqlalchemy/pool/base.py
lib/sqlalchemy/testing/engines.py
test/aaa_profiling/test_pool.py
test/engine/test_execute.py
test/engine/test_pool.py
test/engine/test_reconnect.py
test/engine/test_transaction.py
test/profiles.txt
test/requirements.py

diff --git a/doc/build/changelog/unreleased_14/6337.rst b/doc/build/changelog/unreleased_14/6337.rst
new file mode 100644 (file)
index 0000000..292850f
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, engine, regression
+    :tickets: 6337
+
+    Fixed critical regression caused by the change in :ticket`5497` where the
+    connection pool "init" phase no longer occurred within mutexed isolation,
+    allowing other threads to proceed with the dialect uninitialized, which
+    could then impact the compilation of SQL statements.
+
index 20d11e578ef12e5531f4d6a9132a0e91ccfa71de..e0ebb4a989dd3cf3afae2fa9efc20e662418432a 100644 (file)
@@ -69,6 +69,7 @@ class Connection(Connectable):
         _execution_options=None,
         _dispatch=None,
         _has_events=None,
+        _allow_revalidate=True,
     ):
         """Construct a new Connection."""
         self.engine = engine
@@ -96,7 +97,7 @@ class Connection(Connectable):
             self.__in_begin = False
             self.should_close_with_result = close_with_result
 
-            self.__can_reconnect = True
+            self.__can_reconnect = _allow_revalidate
             self._echo = self.engine._should_log_info()
 
             if _has_events is None:
index 682d0dd5d3056ea5100f04daf6aaf79d16138e57..0351f2ebcca2d417bf79dc86485cf3b34d49f31f 100644 (file)
@@ -626,7 +626,10 @@ def create_engine(url, **kwargs):
         if k in kwargs:
             engine_args[k] = pop_kwarg(k)
 
+    # internal flags used by the test suite for instrumenting / proxying
+    # engines with mocks etc.
     _initialize = kwargs.pop("_initialize", True)
+    _wrap_do_on_connect = kwargs.pop("_wrap_do_on_connect", None)
 
     # all kwargs should be consumed
     if kwargs:
@@ -646,30 +649,38 @@ def create_engine(url, **kwargs):
     engine = engineclass(pool, dialect, u, **engine_args)
 
     if _initialize:
+
         do_on_connect = dialect.on_connect_url(url)
         if do_on_connect:
+            if _wrap_do_on_connect:
+                do_on_connect = _wrap_do_on_connect(do_on_connect)
 
             def on_connect(dbapi_connection, connection_record):
-                conn = getattr(
-                    dbapi_connection, "_sqla_unwrap", dbapi_connection
-                )
-                if conn is None:
-                    return
-
-                do_on_connect(conn)
+                do_on_connect(dbapi_connection)
 
             event.listen(pool, "connect", on_connect)
 
         def first_connect(dbapi_connection, connection_record):
             c = base.Connection(
-                engine, connection=dbapi_connection, _has_events=False
+                engine,
+                connection=dbapi_connection,
+                _has_events=False,
+                # reconnecting will be a reentrant condition, so if the
+                # connection goes away, Connection is then closed
+                _allow_revalidate=False,
             )
             c._execution_options = util.EMPTY_DICT
 
             try:
                 dialect.initialize(c)
             finally:
-                dialect.do_rollback(c.connection)
+                # note that "invalidated" and "closed" are mutually
+                # exclusive in 1.4 Connection.
+                if not c.invalidated and not c.closed:
+                    # transaction is rolled back otherwise, tested by
+                    # test/dialect/postgresql/test_dialect.py
+                    # ::MiscBackendTest::test_initial_transaction_state
+                    dialect.do_rollback(c.connection)
 
         # previously, the "first_connect" event was used here, which was then
         # scaled back if the "on_connect" handler were present.  now,
index 245eaab60a3dbc4584859e8f7c1c8b5eab11aad5..e428d2635fb03dea3bebedbfba917b1f216c03bf 100644 (file)
@@ -268,7 +268,7 @@ class _EmptyListener(_InstanceLevelDispatch):
 
 
 class _CompoundListener(_InstanceLevelDispatch):
-    __slots__ = "_exec_once_mutex", "_exec_once"
+    __slots__ = "_exec_once_mutex", "_exec_once", "_exec_w_sync_once"
 
     def _set_asyncio(self):
         self._exec_once_mutex = AsyncAdaptedLock()
@@ -311,6 +311,29 @@ class _CompoundListener(_InstanceLevelDispatch):
         if not self._exec_once:
             self._exec_once_impl(True, *args, **kw)
 
+    def _exec_w_sync_on_first_run(self, *args, **kw):
+        """Execute this event, and use a mutex if it has not been
+        executed already for this collection, or was called
+        by a previous _exec_w_sync_on_first_run call and
+        raised an exception.
+
+        If _exec_w_sync_on_first_run was already called and didn't raise an
+        exception, then a mutex is not used.
+
+        .. versionadded:: 1.4.11
+
+        """
+        if not self._exec_w_sync_once:
+            with self._exec_once_mutex:
+                try:
+                    self(*args, **kw)
+                except:
+                    raise
+                else:
+                    self._exec_w_sync_once = True
+        else:
+            self(*args, **kw)
+
     def __call__(self, *args, **kw):
         """Execute this event."""
 
@@ -354,6 +377,7 @@ class _ListenerCollection(_CompoundListener):
         if target_cls not in parent._clslevel:
             parent.update_subclass(target_cls)
         self._exec_once = False
+        self._exec_w_sync_once = False
         self.parent_listeners = parent._clslevel[target_cls]
         self.parent = parent
         self.name = parent.name
index d14316fdbeabfd9901d6c4a47b702d5d490a9665..e2ed538003762005493378f0b5e365d6343a73e6 100644 (file)
@@ -359,7 +359,7 @@ class _ConnectionRecord(object):
     def __init__(self, pool, connect=True):
         self.__pool = pool
         if connect:
-            self.__connect(first_connect_check=True)
+            self.__connect()
         self.finalize_callback = deque()
 
     fresh = False
@@ -588,7 +588,7 @@ class _ConnectionRecord(object):
         self.__pool._close_connection(self.connection)
         self.connection = None
 
-    def __connect(self, first_connect_check=False):
+    def __connect(self):
         pool = self.__pool
 
         # ensure any existing connection is removed, so that if
@@ -604,12 +604,18 @@ class _ConnectionRecord(object):
             with util.safe_reraise():
                 pool.logger.debug("Error on connect(): %s", e)
         else:
-            if first_connect_check:
+            # in SQLAlchemy 1.4 the first_connect event is not used by
+            # the engine, so this will usually not be set
+            if pool.dispatch.first_connect:
                 pool.dispatch.first_connect.for_modify(
                     pool.dispatch
                 ).exec_once_unless_exception(self.connection, self)
-            if pool.dispatch.connect:
-                pool.dispatch.connect(self.connection, self)
+
+            # init of the dialect now takes place within the connect
+            # event, so ensure a mutex is used on the first run
+            pool.dispatch.connect.for_modify(
+                pool.dispatch
+            )._exec_w_sync_on_first_run(self.connection, self)
 
 
 def _finalize_fairy(
index 3faf9685757fdfecc9a7932400863af5feaa9066..1d740b4f3bfcc0eae0c0fcfe182adc3f182d0ca0 100644 (file)
@@ -405,7 +405,7 @@ class DBAPIProxyConnection(object):
     """
 
     def __init__(self, engine, cursor_cls):
-        self.conn = self._sqla_unwrap = engine.pool._creator()
+        self.conn = engine.pool._creator()
         self.engine = engine
         self.cursor_cls = cursor_cls
 
@@ -430,4 +430,15 @@ def proxying_engine(
     def mock_conn():
         return conn_cls(config.db, cursor_cls)
 
-    return testing_engine(options={"creator": mock_conn})
+    def _wrap_do_on_connect(do_on_connect):
+        def go(dbapi_conn):
+            return do_on_connect(dbapi_conn.conn)
+
+        return go
+
+    return testing_engine(
+        options={
+            "creator": mock_conn,
+            "_wrap_do_on_connect": _wrap_do_on_connect,
+        }
+    )
index da3c1c52560fcf5f114d54c5d956462673b40f8f..2b1a490c24f1121ca322434842e99c92917a7699 100644 (file)
@@ -1,9 +1,9 @@
+from sqlalchemy import event
 from sqlalchemy.pool import QueuePool
 from sqlalchemy.testing import AssertsExecutionResults
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing import profiling
 
-
 pool = None
 
 
@@ -28,6 +28,11 @@ class QueuePoolTest(fixtures.TestBase, AssertsExecutionResults):
         global pool
         pool = QueuePool(creator=self.Connection, pool_size=3, max_overflow=-1)
 
+        # make this a real world case where we have a "connect" handler
+        @event.listens_for(pool, "connect")
+        def do_connect(dbapi_conn, conn_record):
+            pass
+
     @profiling.function_call_count()
     def test_first_connect(self):
         pool.connect()
index 0b65b3055c6f693135ed462ec1127edeaec46a22..f4449383a9e99f4cb2251a13d0229988b6811b94 100644 (file)
@@ -2,6 +2,7 @@
 
 from contextlib import contextmanager
 import re
+import threading
 import weakref
 
 import sqlalchemy as tsa
@@ -26,6 +27,7 @@ from sqlalchemy import VARCHAR
 from sqlalchemy.engine import default
 from sqlalchemy.engine.base import Connection
 from sqlalchemy.engine.base import Engine
+from sqlalchemy.pool import NullPool
 from sqlalchemy.pool import QueuePool
 from sqlalchemy.sql import column
 from sqlalchemy.sql import literal
@@ -2812,7 +2814,7 @@ class HandleErrorTest(fixtures.TestBase):
         conn.close()
 
 
-class HandleInvalidatedOnConnectTest(fixtures.TestBase):
+class OnConnectTest(fixtures.TestBase):
     __requires__ = ("sqlite",)
 
     def setup_test(self):
@@ -3090,6 +3092,63 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase):
         c.close()
         c2.close()
 
+    @testing.only_on("sqlite+pysqlite")
+    def test_initialize_connect_race(self):
+        """test for :ticket:`6337` fixing the regression in :ticket:`5497`,
+        dialect init is mutexed"""
+
+        m1 = []
+        cls_ = testing.db.dialect.__class__
+
+        class SomeDialect(cls_):
+            def initialize(self, connection):
+                super(SomeDialect, self).initialize(connection)
+                m1.append("initialize")
+
+            def on_connect(self):
+                oc = super(SomeDialect, self).on_connect()
+
+                def my_on_connect(conn):
+                    if oc:
+                        oc(conn)
+                    m1.append("on_connect")
+
+                return my_on_connect
+
+        u1 = Mock(
+            username=None,
+            password=None,
+            host=None,
+            port=None,
+            query={},
+            database=None,
+            _instantiate_plugins=lambda kw: (u1, [], kw),
+            _get_entrypoint=Mock(
+                return_value=Mock(get_dialect_cls=lambda u: SomeDialect)
+            ),
+        )
+
+        for j in range(5):
+            m1[:] = []
+            eng = create_engine(
+                u1,
+                poolclass=NullPool,
+                connect_args={"check_same_thread": False},
+            )
+
+            def go():
+                c = eng.connect()
+                c.execute(text("select 1"))
+                c.close()
+
+            threads = [threading.Thread(target=go) for i in range(10)]
+            for t in threads:
+                t.start()
+            for t in threads:
+                t.join()
+
+            eq_(m1, ["on_connect", "initialize"] + ["on_connect"] * 9)
+
 
 class DialectEventTest(fixtures.TestBase):
     @contextmanager
index 1c388050475bd1bdb984e23752da29323d62e673..5b6dcfa45cbb557cefd31116be137ffe66e64d21 100644 (file)
@@ -780,7 +780,22 @@ class PoolEventsTest(PoolTestBase):
 
 
 class PoolFirstConnectSyncTest(PoolTestBase):
-    # test [ticket:2964]
+    """test for :ticket:`2964`, where the pool would not mutex the
+    initialization of the dialect.
+
+    Unfortunately, as discussed in :ticket:`6337`, this test suite did not
+    ensure that the ``Engine`` itself actually uses the "first_connect" event,
+    so when :ticket:`5497` came along, the "first_connect" event was no longer
+    used and no test detected the re-introduction of the exact same race
+    condition, which was now worse as the un-initialized dialect would now
+    pollute the SQL cache causing the application to not work at all.
+
+    A new suite has therefore been added in test/engine/test_execute.py->
+    OnConnectTest::test_initialize_connect_race to ensure that the engine
+    in total synchronizes the "first_connect" process, which now works
+    using a new events feature _exec_w_sync_on_first_run.
+
+    """
 
     @testing.requires.timing_intensive
     def test_sync(self):
index 04cf5440cf887929b13a7cc4ce8acc047081a8b4..70c4c51906406f213008f4f14ab1428f5df49ae4 100644 (file)
@@ -1144,9 +1144,6 @@ class RealReconnectTest(fixtures.TestBase):
             conn.invalidate()
             conn.invalidate()
 
-    @testing.skip_if(
-        [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle"
-    )
     def test_explode_in_initializer(self):
         engine = engines.testing_engine()
 
@@ -1158,9 +1155,6 @@ class RealReconnectTest(fixtures.TestBase):
         # raises a DBAPIError, not an AttributeError
         assert_raises(exc.DBAPIError, engine.connect)
 
-    @testing.skip_if(
-        [lambda: util.py3k, "oracle+cx_oracle"], "Crashes on py3k+cx_oracle"
-    )
     def test_explode_in_initializer_disconnect(self):
         engine = engines.testing_engine()
 
index d972bcb73a529287c3c525ef697523afc97bfabc..4751963e4f711fcaeca6b1147d3c718edb4fe45f 100644 (file)
@@ -626,7 +626,11 @@ class AutoRollbackTest(fixtures.TestBase):
 
 
 class IsolationLevelTest(fixtures.TestBase):
-    __requires__ = ("isolation_level", "ad_hoc_engines")
+    __requires__ = (
+        "isolation_level",
+        "ad_hoc_engines",
+        "legacy_isolation_level",
+    )
     __backend__ = True
 
     def _default_isolation_level(self):
index f1dab165e0ac7e47c907e5541c698667571a6934..1a271596d182ee9c74ac945a2f4a550eff18a115 100644 (file)
@@ -1,15 +1,15 @@
 # /home/classic/dev/sqlalchemy/test/profiles.txt
 # This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and 
+# For each test in aaa_profiling, the corresponding function and
 # environment is located within this file.  If it doesn't exist,
 # the test is skipped.
-# If a callcount does exist, it is compared to what we received. 
+# If a callcount does exist, it is compared to what we received.
 # assertions are raised if the counts do not match.
-# 
-# To add a new callcount test, apply the function_call_count 
-# decorator and re-run the tests using the --write-profiles 
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles
 # option - this file will be rewritten including the new count.
-# 
+#
 
 # TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
 
@@ -498,12 +498,12 @@ test.aaa_profiling.test_orm.SessionTest.test_expire_lots x86_64_linux_cpython_3.
 
 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
 
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 102
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 102
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 84
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 84
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 86
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 86
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_cextensions 90
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_2.7_sqlite_pysqlite_dbapiunicode_nocextensions 90
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_cextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.8_sqlite_pysqlite_dbapiunicode_nocextensions 72
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_cextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect x86_64_linux_cpython_3.9_sqlite_pysqlite_dbapiunicode_nocextensions 74
 
 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect
 
index 29dd55b45004bab5ba12f8eff2efa8cb248939a7..98dca6124986cc8ef044a04e8db95446a8cefe12 100644 (file)
@@ -407,6 +407,17 @@ class DefaultRequirements(SuiteRequirements):
             "pypostgresql bombs on multiple isolation level calls",
         )
 
+    @property
+    def legacy_isolation_level(self):
+        # refers to the engine isolation_level setting
+        return only_on(
+            ("postgresql", "sqlite", "mysql", "mariadb", "mssql"),
+            "DBAPI has no isolation level support",
+        ) + fails_on(
+            "postgresql+pypostgresql",
+            "pypostgresql bombs on multiple isolation level calls",
+        )
+
     def get_isolation_levels(self, config):
         levels = set(config.db.dialect._isolation_lookup)
 
@@ -1458,14 +1469,7 @@ class DefaultRequirements(SuiteRequirements):
 
     @property
     def ad_hoc_engines(self):
-        return (
-            exclusions.skip_if(
-                ["oracle"],
-                "works, but Oracle just gets tired with "
-                "this much connection activity",
-            )
-            + skip_if(self._sqlite_file_db)
-        )
+        return skip_if(self._sqlite_file_db)
 
     @property
     def no_asyncio(self):