if self._has_events or self.engine._has_events:
self.dispatch.engine_connect(self)
- @util.memoized_property
- def _message_formatter(self) -> Any:
- if "logging_token" in self._execution_options:
- token = self._execution_options["logging_token"]
- return lambda msg: "[%s] %s" % (token, msg)
- else:
- return None
+ # this can be assigned differently via
+ # characteristics.LoggingTokenCharacteristic
+ _message_formatter: Any = None
def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
fmt = self._message_formatter
from typing import ClassVar
if typing.TYPE_CHECKING:
+ from .base import Connection
from .interfaces import DBAPIConnection
from .interfaces import Dialect
def reset_characteristic(
self, dialect: Dialect, dbapi_conn: DBAPIConnection
) -> None:
- """Reset the characteristic on the connection to its default value."""
+ """Reset the characteristic on the DBAPI connection to its default
+ value."""
@abc.abstractmethod
def set_characteristic(
self, dialect: Dialect, dbapi_conn: DBAPIConnection, value: Any
) -> None:
- """set characteristic on the connection to a given value."""
+ """set characteristic on the DBAPI connection to a given value."""
+
+ def set_connection_characteristic(
+ self,
+ dialect: Dialect,
+ conn: Connection,
+ dbapi_conn: DBAPIConnection,
+ value: Any,
+ ) -> None:
+ """set characteristic on the :class:`_engine.Connection` to a given
+ value.
+
+ .. versionadded:: 2.0.30 - added to support elements that are local
+ to the :class:`_engine.Connection` itself.
+
+ """
+ self.set_characteristic(dialect, dbapi_conn, value)
@abc.abstractmethod
def get_characteristic(
"""
+ def get_connection_characteristic(
+ self, dialect: Dialect, conn: Connection, dbapi_conn: DBAPIConnection
+ ) -> Any:
+ """Given a :class:`_engine.Connection`, get the current value of the
+ characteristic.
+
+ .. versionadded:: 2.0.30 - added to support elements that are local
+ to the :class:`_engine.Connection` itself.
+
+ """
+ return self.get_characteristic(dialect, dbapi_conn)
+
class IsolationLevelCharacteristic(ConnectionCharacteristic):
+ """Manage the isolation level on a DBAPI connection"""
+
transactional: ClassVar[bool] = True
def reset_characteristic(
self, dialect: Dialect, dbapi_conn: DBAPIConnection
) -> Any:
return dialect.get_isolation_level(dbapi_conn)
+
+
+class LoggingTokenCharacteristic(ConnectionCharacteristic):
+ """Manage the 'logging_token' option of a :class:`_engine.Connection`.
+
+ .. versionadded:: 2.0.30
+
+ """
+
+ transactional: ClassVar[bool] = False
+
+ def reset_characteristic(
+ self, dialect: Dialect, dbapi_conn: DBAPIConnection
+ ) -> None:
+ pass
+
+ def set_characteristic(
+ self, dialect: Dialect, dbapi_conn: DBAPIConnection, value: Any
+ ) -> None:
+ raise NotImplementedError()
+
+ def set_connection_characteristic(
+ self,
+ dialect: Dialect,
+ conn: Connection,
+ dbapi_conn: DBAPIConnection,
+ value: Any,
+ ) -> None:
+ if value:
+ conn._message_formatter = lambda msg: "[%s] %s" % (value, msg)
+ else:
+ del conn._message_formatter
+
+ def get_characteristic(
+ self, dialect: Dialect, dbapi_conn: DBAPIConnection
+ ) -> Any:
+ raise NotImplementedError()
+
+ def get_connection_characteristic(
+ self, dialect: Dialect, conn: Connection, dbapi_conn: DBAPIConnection
+ ) -> Any:
+ return conn._execution_options.get("logging_token", None)
tuple_in_values = False
connection_characteristics = util.immutabledict(
- {"isolation_level": characteristics.IsolationLevelCharacteristic()}
+ {
+ "isolation_level": characteristics.IsolationLevelCharacteristic(),
+ "logging_token": characteristics.LoggingTokenCharacteristic(),
+ }
)
engine_config_types: Mapping[str, Any] = util.immutabledict(
if connection.in_transaction():
trans_objs = [
(name, obj)
- for name, obj, value in characteristic_values
+ for name, obj, _ in characteristic_values
if obj.transactional
]
if trans_objs:
)
dbapi_connection = connection.connection.dbapi_connection
- for name, characteristic, value in characteristic_values:
- characteristic.set_characteristic(self, dbapi_connection, value)
+ for _, characteristic, value in characteristic_values:
+ characteristic.set_connection_characteristic(
+ self, connection, dbapi_connection, value
+ )
connection.connection._connection_record.finalize_callback.append(
functools.partial(self._reset_characteristics, characteristics)
)
c2.close()
c3.close()
+ def test_logging_token_option_connection_updates(self, token_engine):
+ """test #11210"""
+
+ eng = token_engine
+
+ c1 = eng.connect().execution_options(logging_token="my_name_1")
+
+ self._assert_token_in_execute(c1, "my_name_1")
+
+ c1.execution_options(logging_token="my_name_2")
+
+ self._assert_token_in_execute(c1, "my_name_2")
+
+ c1.execution_options(logging_token=None)
+
+ self._assert_no_tokens_in_execute(c1)
+
+ c1.close()
+
+ def test_logging_token_option_not_transactional(self, token_engine):
+ """test #11210"""
+
+ eng = token_engine
+
+ c1 = eng.connect()
+
+ with c1.begin():
+ self._assert_no_tokens_in_execute(c1)
+
+ c1.execution_options(logging_token="my_name_1")
+
+ self._assert_token_in_execute(c1, "my_name_1")
+
+ self._assert_token_in_execute(c1, "my_name_1")
+
+ c1.close()
+
def test_logging_token_option_engine(self, token_engine):
eng = token_engine