From: Mike Bayer Date: Tue, 17 Nov 2020 22:13:24 +0000 (-0500) Subject: Support pool.connect() event firing before all else X-Git-Tag: rel_1_4_0b2~140^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9b779611f9;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Support pool.connect() event firing before all else Fixed regression where a connection pool event specified with a keyword, most notably ``insert=True``, would be lost when the event were set up. This would prevent startup events that need to fire before dialect-level events from working correctly. The internal mechanics of the engine connection routine has been altered such that it's now guaranteed that a user-defined event handler for the :meth:`_pool.PoolEvents.connect` handler, when established using ``insert=True``, will allow an event handler to run that is definitely invoked **before** any dialect-specific initialization starts up, most notably when it does things like detect default schema name. Previously, this would occur in most cases but not unconditionally. A new example is added to the schema documentation illustrating how to establish the "default schema name" within an on-connect event (upcoming as part of I882edd5bbe06ee5b4d0a9c148854a57b2bcd4741) Addiional changes to support setting default schema name: The Oracle dialect now uses ``select sys_context( 'userenv', 'current_schema' ) from dual`` to get the default schema name, rather than ``SELECT USER FROM DUAL``, to accommodate for changes to the session-local schema name under Oracle. Added a read/write ``.autocommit`` attribute to the DBAPI-adaptation layer for the asyncpg dialect. This so that when working with DBAPI-specific schemes that need to use "autocommit" directly with the DBAPI connection, the same ``.autocommit`` attribute which works with both psycopg2 as well as pg8000 is available. Fixes: #5716 Fixes: #5708 Change-Id: I7dce56b4345ffc720e25e2aaccb7e42bb29e5671 --- diff --git a/doc/build/changelog/unreleased_14/5708.rst b/doc/build/changelog/unreleased_14/5708.rst new file mode 100644 index 0000000000..ea3023a722 --- /dev/null +++ b/doc/build/changelog/unreleased_14/5708.rst @@ -0,0 +1,32 @@ +.. change:: + :tags: bug, pool + :tickets: 5708 + + Fixed regression where a connection pool event specified with a keyword, + most notably ``insert=True``, would be lost when the event were set up. + This would prevent startup events that need to fire before dialect-level + events from working correctly. + + +.. change:: + :tags: usecase, pool + :tickets: 5708, 5497 + + The internal mechanics of the engine connection routine has been altered + such that it's now guaranteed that a user-defined event handler for the + :meth:`_pool.PoolEvents.connect` handler, when established using + ``insert=True``, will allow an event handler to run that is definitely + invoked **before** any dialect-specific initialization starts up, most + notably when it does things like detect default schema name. + Previously, this would occur in most cases but not unconditionally. + A new example is added to the schema documentation illustrating how to + establish the "default schema name" within an on-connect event. + +.. change:: + :tags: usecase, postgresql + + Added a read/write ``.autocommit`` attribute to the DBAPI-adaptation layer + for the asyncpg dialect. This so that when working with DBAPI-specific + schemes that need to use "autocommit" directly with the DBAPI connection, + the same ``.autocommit`` attribute which works with both psycopg2 as well + as pg8000 is available. \ No newline at end of file diff --git a/doc/build/changelog/unreleased_14/5716.rst b/doc/build/changelog/unreleased_14/5716.rst new file mode 100644 index 0000000000..fc8eb380d3 --- /dev/null +++ b/doc/build/changelog/unreleased_14/5716.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: bug, oracle + :tickets: 5716 + + The Oracle dialect now uses + ``select sys_context( 'userenv', 'current_schema' ) from dual`` to get + the default schema name, rather than ``SELECT USER FROM DUAL``, to + accommodate for changes to the session-local schema name under Oracle. \ No newline at end of file diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index 7bdaafd82d..c0f7aa5ce0 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -1587,7 +1587,9 @@ class OracleDialect(default.DefaultDialect): def _get_default_schema_name(self, connection): return self.normalize_name( - connection.exec_driver_sql("SELECT USER FROM DUAL").scalar() + connection.exec_driver_sql( + "select sys_context( 'userenv', 'current_schema' ) from dual" + ).scalar() ) def _resolve_synonym( diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py index 01854fdce5..d19dfc9fe6 100644 --- a/lib/sqlalchemy/dialects/oracle/provision.py +++ b/lib/sqlalchemy/dialects/oracle/provision.py @@ -7,6 +7,7 @@ from ...testing.provision import drop_db from ...testing.provision import follower_url_from_main from ...testing.provision import log from ...testing.provision import run_reap_dbs +from ...testing.provision import set_default_schema_on_connection from ...testing.provision import temp_table_keyword_args from ...testing.provision import update_db_opts @@ -106,3 +107,12 @@ def _oracle_temp_table_keyword_args(cfg, eng): "prefixes": ["GLOBAL TEMPORARY"], "oracle_on_commit": "PRESERVE ROWS", } + + +@set_default_schema_on_connection.for_db("oracle") +def _oracle_set_default_schema_on_connection( + cfg, dbapi_connection, schema_name +): + cursor = dbapi_connection.cursor() + cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name) + cursor.close() diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 5690247902..412feda0fd 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -491,7 +491,7 @@ class AsyncAdapt_asyncpg_connection: def __init__(self, dbapi, connection): self.dbapi = dbapi self._connection = connection - self.isolation_level = "read_committed" + self.isolation_level = self._isolation_setting = "read_committed" self.readonly = False self.deferrable = False self._transaction = None @@ -512,10 +512,21 @@ class AsyncAdapt_asyncpg_connection: else: raise error + @property + def autocommit(self): + return self.isolation_level == "autocommit" + + @autocommit.setter + def autocommit(self, value): + if value: + self.isolation_level = "autocommit" + else: + self.isolation_level = self._isolation_setting + def set_isolation_level(self, level): if self._started: self.rollback() - self.isolation_level = level + self.isolation_level = self._isolation_setting = level async def _start_transaction(self): if self.isolation_level == "autocommit": diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py index 6c6dc4be64..9433ec4585 100644 --- a/lib/sqlalchemy/dialects/postgresql/provision.py +++ b/lib/sqlalchemy/dialects/postgresql/provision.py @@ -5,6 +5,7 @@ from ... import text from ...testing.provision import create_db from ...testing.provision import drop_db from ...testing.provision import log +from ...testing.provision import set_default_schema_on_connection from ...testing.provision import temp_table_keyword_args @@ -64,3 +65,15 @@ def _pg_drop_db(cfg, eng, ident): @temp_table_keyword_args.for_db("postgresql") def _postgresql_temp_table_keyword_args(cfg, eng): return {"prefixes": ["TEMPORARY"]} + + +@set_default_schema_on_connection.for_db("postgresql") +def _postgresql_set_default_schema_on_connection( + cfg, dbapi_connection, schema_name +): + existing_autocommit = dbapi_connection.autocommit + dbapi_connection.autocommit = True + cursor = dbapi_connection.cursor() + cursor.execute("SET SESSION search_path='%s'" % schema_name) + cursor.close() + dbapi_connection.autocommit = existing_autocommit diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index 7f5b5e8f5d..786f8f5d6a 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -657,17 +657,14 @@ def create_engine(url, **kwargs): dialect.initialize(c) dialect.do_rollback(c.connection) - if do_on_connect: - event.listen( - pool, "connect", first_connect, _once_unless_exception=True - ) - else: - event.listen( - pool, - "first_connect", - first_connect, - _once_unless_exception=True, - ) + # previously, the "first_connect" event was used here, which was then + # scaled back if the "on_connect" handler were present. now, + # since "on_connect" is virtually always present, just use + # "connect" event with once_unless_exception in all cases so that + # the connection event flow is consistent in all cases. + event.listen( + pool, "connect", first_connect, _once_unless_exception=True + ) dialect_cls.engine_created(engine) if entrypoint is not dialect_cls: diff --git a/lib/sqlalchemy/pool/events.py b/lib/sqlalchemy/pool/events.py index 9443877a91..363afdd78f 100644 --- a/lib/sqlalchemy/pool/events.py +++ b/lib/sqlalchemy/pool/events.py @@ -58,7 +58,9 @@ class PoolEvents(event.Events): def _listen(cls, event_key, **kw): target = event_key.dispatch_target - event_key.base_listen(asyncio=target._is_asyncio) + kw.setdefault("asyncio", target._is_asyncio) + + event_key.base_listen(**kw) def connect(self, dbapi_connection, connection_record): """Called at the moment a particular DBAPI connection is first diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 678def3278..9b5fa02555 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -313,3 +313,11 @@ def get_temp_table_name(cfg, eng, base_name): use. The mssql dialect's implementation will need a "#" prepended. """ return base_name + + +@register.init +def set_default_schema_on_connection(cfg, dbapi_connection, schema_name): + raise NotImplementedError( + "backend does not implement a schema name set function: %s" + % (cfg.db.url,) + ) diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index bd2d4eaf9b..7b938c9939 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -457,6 +457,13 @@ class SuiteRequirements(Requirements): """ return exclusions.closed() + @property + def default_schema_name_switch(self): + """target dialect implements provisioning module including + set_default_schema_on_connection""" + + return exclusions.closed() + @property def server_side_cursors(self): """Target dialect must support server side cursors.""" diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py index c5ede08c69..7f697b915d 100644 --- a/lib/sqlalchemy/testing/suite/test_dialect.py +++ b/lib/sqlalchemy/testing/suite/test_dialect.py @@ -1,12 +1,15 @@ #! coding: utf-8 +from sqlalchemy import event from .. import assert_raises from .. import config +from .. import engines from .. import eq_ from .. import fixtures from .. import ne_ from .. import provide_metadata from ..config import requirements +from ..provision import set_default_schema_on_connection from ..schema import Column from ..schema import Table from ... import exc @@ -209,3 +212,76 @@ class EscapingTest(fixtures.TestBase): ), "some %% other value", ) + + +class WeCanSetDefaultSchemaWEventsTest(fixtures.TestBase): + __backend__ = True + + __requires__ = ("default_schema_name_switch",) + + def test_control_case(self): + default_schema_name = config.db.dialect.default_schema_name + + eng = engines.testing_engine() + with eng.connect(): + pass + + eq_(eng.dialect.default_schema_name, default_schema_name) + + def test_wont_work_wo_insert(self): + default_schema_name = config.db.dialect.default_schema_name + + eng = engines.testing_engine() + + @event.listens_for(eng, "connect") + def on_connect(dbapi_connection, connection_record): + set_default_schema_on_connection( + config, dbapi_connection, config.test_schema + ) + + with eng.connect() as conn: + what_it_should_be = eng.dialect._get_default_schema_name(conn) + eq_(what_it_should_be, config.test_schema) + + eq_(eng.dialect.default_schema_name, default_schema_name) + + def test_schema_change_on_connect(self): + eng = engines.testing_engine() + + @event.listens_for(eng, "connect", insert=True) + def on_connect(dbapi_connection, connection_record): + set_default_schema_on_connection( + config, dbapi_connection, config.test_schema + ) + + with eng.connect() as conn: + what_it_should_be = eng.dialect._get_default_schema_name(conn) + eq_(what_it_should_be, config.test_schema) + + eq_(eng.dialect.default_schema_name, config.test_schema) + + def test_schema_change_works_w_transactions(self): + eng = engines.testing_engine() + + @event.listens_for(eng, "connect", insert=True) + def on_connect(dbapi_connection, *arg): + set_default_schema_on_connection( + config, dbapi_connection, config.test_schema + ) + + with eng.connect() as conn: + trans = conn.begin() + what_it_should_be = eng.dialect._get_default_schema_name(conn) + eq_(what_it_should_be, config.test_schema) + trans.rollback() + + what_it_should_be = eng.dialect._get_default_schema_name(conn) + eq_(what_it_should_be, config.test_schema) + + eq_(eng.dialect.default_schema_name, config.test_schema) + + +class FutureWeCanSetDefaultSchemaWEventsTest( + fixtures.FutureEngineMixin, WeCanSetDefaultSchemaWEventsTest +): + pass diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index 7c5257b875..d632f7fed8 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -66,6 +66,7 @@ from .compat import itertools_filter # noqa from .compat import itertools_filterfalse # noqa from .compat import namedtuple # noqa from .compat import next # noqa +from .compat import nullcontext # noqa from .compat import osx # noqa from .compat import parse_qsl # noqa from .compat import perf_counter # noqa diff --git a/lib/sqlalchemy/util/compat.py b/lib/sqlalchemy/util/compat.py index 1480bbeee3..e8c4880478 100644 --- a/lib/sqlalchemy/util/compat.py +++ b/lib/sqlalchemy/util/compat.py @@ -47,6 +47,24 @@ FullArgSpec = collections.namedtuple( ], ) + +class nullcontext(object): + """Context manager that does no additional processing. + + Vendored from Python 3.7. + + """ + + def __init__(self, enter_result=None): + self.enter_result = enter_result + + def __enter__(self): + return self.enter_result + + def __exit__(self, *excinfo): + pass + + try: import threading except ImportError: diff --git a/test/dialect/oracle/test_dialect.py b/test/dialect/oracle/test_dialect.py index 128ecc573a..aafad8dc15 100644 --- a/test/dialect/oracle/test_dialect.py +++ b/test/dialect/oracle/test_dialect.py @@ -24,6 +24,8 @@ from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import AssertsExecutionResults +from sqlalchemy.testing import config +from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock @@ -65,6 +67,50 @@ class DialectTest(fixtures.TestBase): cx_oracle.OracleDialect_cx_oracle(dbapi=Mock()) +class DefaultSchemaNameTest(fixtures.TestBase): + __backend__ = True + __only_on__ = "oracle" + + def test_default_name_is_the_user(self): + default_schema_name = testing.db.dialect.default_schema_name + + with testing.db.connect() as conn: + oracles_known_default_schema_name = ( + testing.db.dialect.normalize_name( + conn.exec_driver_sql("SELECT USER FROM DUAL").scalar() + ) + ) + + eq_(oracles_known_default_schema_name, default_schema_name) + + def test_default_schema_detected(self): + default_schema_name = testing.db.dialect.default_schema_name + + eng = engines.testing_engine() + + with eng.connect() as conn: + eq_( + testing.db.dialect._get_default_schema_name(conn), + default_schema_name, + ) + + conn.exec_driver_sql( + "ALTER SESSION SET CURRENT_SCHEMA=%s" % config.test_schema + ) + + eq_( + testing.db.dialect._get_default_schema_name(conn), + config.test_schema, + ) + + conn.invalidate() + + eq_( + testing.db.dialect._get_default_schema_name(conn), + default_schema_name, + ) + + class EncodingErrorsTest(fixtures.TestBase): """mock test for encoding_errors. diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index bfdd148454..5cea604d68 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -39,6 +39,7 @@ from sqlalchemy.engine import url from sqlalchemy.testing import engines from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ +from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import mock from sqlalchemy.testing.assertions import assert_raises @@ -802,6 +803,55 @@ class MiscBackendTest( with engine.connect(): eq_(engine.dialect._backslash_escapes, expected) + def test_dbapi_autocommit_attribute(self): + """all the supported DBAPIs have an .autocommit attribute. make + sure it works and preserves isolation level. + + This is added in particular to support the asyncpg dialect that + has a DBAPI compatibility layer. + + """ + + with testing.db.connect().execution_options( + isolation_level="SERIALIZABLE" + ) as conn: + dbapi_conn = conn.connection.connection + + is_false(dbapi_conn.autocommit) + + with conn.begin(): + + existing_isolation = conn.exec_driver_sql( + "show transaction isolation level" + ).scalar() + eq_(existing_isolation.upper(), "SERIALIZABLE") + + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + eq_(txid1, txid2) + + dbapi_conn.autocommit = True + + with conn.begin(): + # magic way to see if we are in autocommit mode from + # the server's perspective + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + ne_(txid1, txid2) + + dbapi_conn.autocommit = False + + with conn.begin(): + + existing_isolation = conn.exec_driver_sql( + "show transaction isolation level" + ).scalar() + eq_(existing_isolation.upper(), "SERIALIZABLE") + + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + eq_(txid1, txid2) + def test_readonly_flag_connection(self): with testing.db.connect() as conn: # asyncpg requires serializable for readonly.. diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index f24b1c1108..efec9376c1 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1441,6 +1441,111 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.be1.call_count, 2) eq_(canary.be2.call_count, 2) + @testing.combinations((True, False), (True, True), (False, False)) + def test_insert_connect_is_definitely_first( + self, mock_out_on_connect, add_our_own_onconnect + ): + """test issue #5708. + + We want to ensure that a single "connect" event may be invoked + *before* dialect initialize as well as before dialect on_connects. + + This is also partially reliant on the changes we made as a result of + #5497, however here we go further with the changes and remove use + of the pool first_connect() event entirely so that the startup + for a dialect is fully consistent. + + """ + if mock_out_on_connect: + if add_our_own_onconnect: + + def our_connect(connection): + m1.our_connect("our connect event") + + patcher = mock.patch.object( + config.db.dialect.__class__, + "on_connect", + lambda self: our_connect, + ) + else: + patcher = mock.patch.object( + config.db.dialect.__class__, + "on_connect", + lambda self: None, + ) + else: + patcher = util.nullcontext() + + with patcher: + e1 = create_engine(config.db_url) + + initialize = e1.dialect.initialize + + def init(connection): + initialize(connection) + + with mock.patch.object( + e1.dialect, "initialize", side_effect=init + ) as m1: + + @event.listens_for(e1, "connect", insert=True) + def go1(dbapi_conn, xyz): + m1.foo("custom event first") + + @event.listens_for(e1, "connect") + def go2(dbapi_conn, xyz): + m1.foo("custom event last") + + c1 = e1.connect() + + m1.bar("ok next connection") + + c2 = e1.connect() + + # this happens with sqlite singletonthreadpool. + # we can almost use testing.requires.independent_connections + # but sqlite file backend will also have independent + # connections here. + its_the_same_connection = ( + c1.connection.connection is c2.connection.connection + ) + c1.close() + c2.close() + + if add_our_own_onconnect: + calls = [ + mock.call.foo("custom event first"), + mock.call.our_connect("our connect event"), + mock.call(mock.ANY), + mock.call.foo("custom event last"), + mock.call.bar("ok next connection"), + ] + else: + calls = [ + mock.call.foo("custom event first"), + mock.call(mock.ANY), + mock.call.foo("custom event last"), + mock.call.bar("ok next connection"), + ] + + if not its_the_same_connection: + if add_our_own_onconnect: + calls.extend( + [ + mock.call.foo("custom event first"), + mock.call.our_connect("our connect event"), + mock.call.foo("custom event last"), + ] + ) + else: + calls.extend( + [ + mock.call.foo("custom event first"), + mock.call.foo("custom event last"), + ] + ) + eq_(m1.mock_calls, calls) + def test_new_exec_driver_sql_no_events(self): m1 = Mock() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index eb705da61a..9ea3065b03 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -493,6 +493,26 @@ class PoolEventsTest(PoolTestBase): p.connect() eq_(canary, ["connect"]) + def test_connect_insert_event(self): + p = self._queuepool_fixture() + canary = [] + + def connect_one(*arg, **kw): + canary.append("connect_one") + + def connect_two(*arg, **kw): + canary.append("connect_two") + + def connect_three(*arg, **kw): + canary.append("connect_three") + + event.listen(p, "connect", connect_one) + event.listen(p, "connect", connect_two, insert=True) + event.listen(p, "connect", connect_three) + + p.connect() + eq_(canary, ["connect_two", "connect_one", "connect_three"]) + def test_connect_event_fires_subsequent(self): p, canary = self._connect_event_fixture() diff --git a/test/requirements.py b/test/requirements.py index 2cdc3baca7..5911d87af8 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -552,6 +552,10 @@ class DefaultRequirements(SuiteRequirements): """ return only_on(["postgresql"]) + @property + def default_schema_name_switch(self): + return only_on(["postgresql", "oracle"]) + @property def unique_constraint_reflection(self): return fails_on_everything_except(