--- /dev/null
+.. change::
+ :tags: bug, pool
+ :tickets: 5708
+
+ Fixed regression where a connection pool event specified with a keyword,
+ most notably ``insert=True``, would be lost when the event were set up.
+ This would prevent startup events that need to fire before dialect-level
+ events from working correctly.
+
+
+.. change::
+ :tags: usecase, pool
+ :tickets: 5708, 5497
+
+ The internal mechanics of the engine connection routine has been altered
+ such that it's now guaranteed that a user-defined event handler for the
+ :meth:`_pool.PoolEvents.connect` handler, when established using
+ ``insert=True``, will allow an event handler to run that is definitely
+ invoked **before** any dialect-specific initialization starts up, most
+ notably when it does things like detect default schema name.
+ Previously, this would occur in most cases but not unconditionally.
+ A new example is added to the schema documentation illustrating how to
+ establish the "default schema name" within an on-connect event.
+
+.. change::
+ :tags: usecase, postgresql
+
+ Added a read/write ``.autocommit`` attribute to the DBAPI-adaptation layer
+ for the asyncpg dialect. This so that when working with DBAPI-specific
+ schemes that need to use "autocommit" directly with the DBAPI connection,
+ the same ``.autocommit`` attribute which works with both psycopg2 as well
+ as pg8000 is available.
\ No newline at end of file
--- /dev/null
+.. change::
+ :tags: bug, oracle
+ :tickets: 5716
+
+ The Oracle dialect now uses
+ ``select sys_context( 'userenv', 'current_schema' ) from dual`` to get
+ the default schema name, rather than ``SELECT USER FROM DUAL``, to
+ accommodate for changes to the session-local schema name under Oracle.
\ No newline at end of file
def _get_default_schema_name(self, connection):
return self.normalize_name(
- connection.exec_driver_sql("SELECT USER FROM DUAL").scalar()
+ connection.exec_driver_sql(
+ "select sys_context( 'userenv', 'current_schema' ) from dual"
+ ).scalar()
)
def _resolve_synonym(
from ...testing.provision import follower_url_from_main
from ...testing.provision import log
from ...testing.provision import run_reap_dbs
+from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
from ...testing.provision import update_db_opts
"prefixes": ["GLOBAL TEMPORARY"],
"oracle_on_commit": "PRESERVE ROWS",
}
+
+
+@set_default_schema_on_connection.for_db("oracle")
+def _oracle_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name)
+ cursor.close()
def __init__(self, dbapi, connection):
self.dbapi = dbapi
self._connection = connection
- self.isolation_level = "read_committed"
+ self.isolation_level = self._isolation_setting = "read_committed"
self.readonly = False
self.deferrable = False
self._transaction = None
else:
raise error
+ @property
+ def autocommit(self):
+ return self.isolation_level == "autocommit"
+
+ @autocommit.setter
+ def autocommit(self, value):
+ if value:
+ self.isolation_level = "autocommit"
+ else:
+ self.isolation_level = self._isolation_setting
+
def set_isolation_level(self, level):
if self._started:
self.rollback()
- self.isolation_level = level
+ self.isolation_level = self._isolation_setting = level
async def _start_transaction(self):
if self.isolation_level == "autocommit":
from ...testing.provision import create_db
from ...testing.provision import drop_db
from ...testing.provision import log
+from ...testing.provision import set_default_schema_on_connection
from ...testing.provision import temp_table_keyword_args
@temp_table_keyword_args.for_db("postgresql")
def _postgresql_temp_table_keyword_args(cfg, eng):
return {"prefixes": ["TEMPORARY"]}
+
+
+@set_default_schema_on_connection.for_db("postgresql")
+def _postgresql_set_default_schema_on_connection(
+ cfg, dbapi_connection, schema_name
+):
+ existing_autocommit = dbapi_connection.autocommit
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
+ cursor.execute("SET SESSION search_path='%s'" % schema_name)
+ cursor.close()
+ dbapi_connection.autocommit = existing_autocommit
dialect.initialize(c)
dialect.do_rollback(c.connection)
- if do_on_connect:
- event.listen(
- pool, "connect", first_connect, _once_unless_exception=True
- )
- else:
- event.listen(
- pool,
- "first_connect",
- first_connect,
- _once_unless_exception=True,
- )
+ # previously, the "first_connect" event was used here, which was then
+ # scaled back if the "on_connect" handler were present. now,
+ # since "on_connect" is virtually always present, just use
+ # "connect" event with once_unless_exception in all cases so that
+ # the connection event flow is consistent in all cases.
+ event.listen(
+ pool, "connect", first_connect, _once_unless_exception=True
+ )
dialect_cls.engine_created(engine)
if entrypoint is not dialect_cls:
def _listen(cls, event_key, **kw):
target = event_key.dispatch_target
- event_key.base_listen(asyncio=target._is_asyncio)
+ kw.setdefault("asyncio", target._is_asyncio)
+
+ event_key.base_listen(**kw)
def connect(self, dbapi_connection, connection_record):
"""Called at the moment a particular DBAPI connection is first
use. The mssql dialect's implementation will need a "#" prepended.
"""
return base_name
+
+
+@register.init
+def set_default_schema_on_connection(cfg, dbapi_connection, schema_name):
+ raise NotImplementedError(
+ "backend does not implement a schema name set function: %s"
+ % (cfg.db.url,)
+ )
"""
return exclusions.closed()
+ @property
+ def default_schema_name_switch(self):
+ """target dialect implements provisioning module including
+ set_default_schema_on_connection"""
+
+ return exclusions.closed()
+
@property
def server_side_cursors(self):
"""Target dialect must support server side cursors."""
#! coding: utf-8
+from sqlalchemy import event
from .. import assert_raises
from .. import config
+from .. import engines
from .. import eq_
from .. import fixtures
from .. import ne_
from .. import provide_metadata
from ..config import requirements
+from ..provision import set_default_schema_on_connection
from ..schema import Column
from ..schema import Table
from ... import exc
),
"some %% other value",
)
+
+
+class WeCanSetDefaultSchemaWEventsTest(fixtures.TestBase):
+ __backend__ = True
+
+ __requires__ = ("default_schema_name_switch",)
+
+ def test_control_case(self):
+ default_schema_name = config.db.dialect.default_schema_name
+
+ eng = engines.testing_engine()
+ with eng.connect():
+ pass
+
+ eq_(eng.dialect.default_schema_name, default_schema_name)
+
+ def test_wont_work_wo_insert(self):
+ default_schema_name = config.db.dialect.default_schema_name
+
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect")
+ def on_connect(dbapi_connection, connection_record):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, default_schema_name)
+
+ def test_schema_change_on_connect(self):
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect", insert=True)
+ def on_connect(dbapi_connection, connection_record):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, config.test_schema)
+
+ def test_schema_change_works_w_transactions(self):
+ eng = engines.testing_engine()
+
+ @event.listens_for(eng, "connect", insert=True)
+ def on_connect(dbapi_connection, *arg):
+ set_default_schema_on_connection(
+ config, dbapi_connection, config.test_schema
+ )
+
+ with eng.connect() as conn:
+ trans = conn.begin()
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+ trans.rollback()
+
+ what_it_should_be = eng.dialect._get_default_schema_name(conn)
+ eq_(what_it_should_be, config.test_schema)
+
+ eq_(eng.dialect.default_schema_name, config.test_schema)
+
+
+class FutureWeCanSetDefaultSchemaWEventsTest(
+ fixtures.FutureEngineMixin, WeCanSetDefaultSchemaWEventsTest
+):
+ pass
from .compat import itertools_filterfalse # noqa
from .compat import namedtuple # noqa
from .compat import next # noqa
+from .compat import nullcontext # noqa
from .compat import osx # noqa
from .compat import parse_qsl # noqa
from .compat import perf_counter # noqa
],
)
+
+class nullcontext(object):
+ """Context manager that does no additional processing.
+
+ Vendored from Python 3.7.
+
+ """
+
+ def __init__(self, enter_result=None):
+ self.enter_result = enter_result
+
+ def __enter__(self):
+ return self.enter_result
+
+ def __exit__(self, *excinfo):
+ pass
+
+
try:
import threading
except ImportError:
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import AssertsExecutionResults
+from sqlalchemy.testing import config
+from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
cx_oracle.OracleDialect_cx_oracle(dbapi=Mock())
+class DefaultSchemaNameTest(fixtures.TestBase):
+ __backend__ = True
+ __only_on__ = "oracle"
+
+ def test_default_name_is_the_user(self):
+ default_schema_name = testing.db.dialect.default_schema_name
+
+ with testing.db.connect() as conn:
+ oracles_known_default_schema_name = (
+ testing.db.dialect.normalize_name(
+ conn.exec_driver_sql("SELECT USER FROM DUAL").scalar()
+ )
+ )
+
+ eq_(oracles_known_default_schema_name, default_schema_name)
+
+ def test_default_schema_detected(self):
+ default_schema_name = testing.db.dialect.default_schema_name
+
+ eng = engines.testing_engine()
+
+ with eng.connect() as conn:
+ eq_(
+ testing.db.dialect._get_default_schema_name(conn),
+ default_schema_name,
+ )
+
+ conn.exec_driver_sql(
+ "ALTER SESSION SET CURRENT_SCHEMA=%s" % config.test_schema
+ )
+
+ eq_(
+ testing.db.dialect._get_default_schema_name(conn),
+ config.test_schema,
+ )
+
+ conn.invalidate()
+
+ eq_(
+ testing.db.dialect._get_default_schema_name(conn),
+ default_schema_name,
+ )
+
+
class EncodingErrorsTest(fixtures.TestBase):
"""mock test for encoding_errors.
from sqlalchemy.testing import engines
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
with engine.connect():
eq_(engine.dialect._backslash_escapes, expected)
+ def test_dbapi_autocommit_attribute(self):
+ """all the supported DBAPIs have an .autocommit attribute. make
+ sure it works and preserves isolation level.
+
+ This is added in particular to support the asyncpg dialect that
+ has a DBAPI compatibility layer.
+
+ """
+
+ with testing.db.connect().execution_options(
+ isolation_level="SERIALIZABLE"
+ ) as conn:
+ dbapi_conn = conn.connection.connection
+
+ is_false(dbapi_conn.autocommit)
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
+ dbapi_conn.autocommit = True
+
+ with conn.begin():
+ # magic way to see if we are in autocommit mode from
+ # the server's perspective
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ ne_(txid1, txid2)
+
+ dbapi_conn.autocommit = False
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
def test_readonly_flag_connection(self):
with testing.db.connect() as conn:
# asyncpg requires serializable for readonly..
eq_(canary.be1.call_count, 2)
eq_(canary.be2.call_count, 2)
+ @testing.combinations((True, False), (True, True), (False, False))
+ def test_insert_connect_is_definitely_first(
+ self, mock_out_on_connect, add_our_own_onconnect
+ ):
+ """test issue #5708.
+
+ We want to ensure that a single "connect" event may be invoked
+ *before* dialect initialize as well as before dialect on_connects.
+
+ This is also partially reliant on the changes we made as a result of
+ #5497, however here we go further with the changes and remove use
+ of the pool first_connect() event entirely so that the startup
+ for a dialect is fully consistent.
+
+ """
+ if mock_out_on_connect:
+ if add_our_own_onconnect:
+
+ def our_connect(connection):
+ m1.our_connect("our connect event")
+
+ patcher = mock.patch.object(
+ config.db.dialect.__class__,
+ "on_connect",
+ lambda self: our_connect,
+ )
+ else:
+ patcher = mock.patch.object(
+ config.db.dialect.__class__,
+ "on_connect",
+ lambda self: None,
+ )
+ else:
+ patcher = util.nullcontext()
+
+ with patcher:
+ e1 = create_engine(config.db_url)
+
+ initialize = e1.dialect.initialize
+
+ def init(connection):
+ initialize(connection)
+
+ with mock.patch.object(
+ e1.dialect, "initialize", side_effect=init
+ ) as m1:
+
+ @event.listens_for(e1, "connect", insert=True)
+ def go1(dbapi_conn, xyz):
+ m1.foo("custom event first")
+
+ @event.listens_for(e1, "connect")
+ def go2(dbapi_conn, xyz):
+ m1.foo("custom event last")
+
+ c1 = e1.connect()
+
+ m1.bar("ok next connection")
+
+ c2 = e1.connect()
+
+ # this happens with sqlite singletonthreadpool.
+ # we can almost use testing.requires.independent_connections
+ # but sqlite file backend will also have independent
+ # connections here.
+ its_the_same_connection = (
+ c1.connection.connection is c2.connection.connection
+ )
+ c1.close()
+ c2.close()
+
+ if add_our_own_onconnect:
+ calls = [
+ mock.call.foo("custom event first"),
+ mock.call.our_connect("our connect event"),
+ mock.call(mock.ANY),
+ mock.call.foo("custom event last"),
+ mock.call.bar("ok next connection"),
+ ]
+ else:
+ calls = [
+ mock.call.foo("custom event first"),
+ mock.call(mock.ANY),
+ mock.call.foo("custom event last"),
+ mock.call.bar("ok next connection"),
+ ]
+
+ if not its_the_same_connection:
+ if add_our_own_onconnect:
+ calls.extend(
+ [
+ mock.call.foo("custom event first"),
+ mock.call.our_connect("our connect event"),
+ mock.call.foo("custom event last"),
+ ]
+ )
+ else:
+ calls.extend(
+ [
+ mock.call.foo("custom event first"),
+ mock.call.foo("custom event last"),
+ ]
+ )
+ eq_(m1.mock_calls, calls)
+
def test_new_exec_driver_sql_no_events(self):
m1 = Mock()
p.connect()
eq_(canary, ["connect"])
+ def test_connect_insert_event(self):
+ p = self._queuepool_fixture()
+ canary = []
+
+ def connect_one(*arg, **kw):
+ canary.append("connect_one")
+
+ def connect_two(*arg, **kw):
+ canary.append("connect_two")
+
+ def connect_three(*arg, **kw):
+ canary.append("connect_three")
+
+ event.listen(p, "connect", connect_one)
+ event.listen(p, "connect", connect_two, insert=True)
+ event.listen(p, "connect", connect_three)
+
+ p.connect()
+ eq_(canary, ["connect_two", "connect_one", "connect_three"])
+
def test_connect_event_fires_subsequent(self):
p, canary = self._connect_event_fixture()
"""
return only_on(["postgresql"])
+ @property
+ def default_schema_name_switch(self):
+ return only_on(["postgresql", "oracle"])
+
@property
def unique_constraint_reflection(self):
return fails_on_everything_except(