]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Create connection characteristics API; implement postgresql flags
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 7 Sep 2020 20:24:47 +0000 (16:24 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 8 Sep 2020 15:16:53 +0000 (11:16 -0400)
Added support for PostgreSQL "readonly" and "deferrable" flags for all of
psycopg2, asyncpg and pg8000 dialects.   This takes advantage of a newly
generalized version of the "isolation level" API to support other kinds of
session attributes set via execution options that are reliably reset
when connections are returned to the connection pool.

Fixes: #5549
Change-Id: I0ad6d7a095e49d331618274c40ce75c76afdc7dd

doc/build/changelog/unreleased_14/5549.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/postgresql/pg8000.py
lib/sqlalchemy/dialects/postgresql/psycopg2.py
lib/sqlalchemy/engine/characteristics.py [new file with mode: 0644]
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/util/__init__.py
lib/sqlalchemy/util/compat.py
test/dialect/postgresql/test_dialect.py
test/engine/test_transaction.py

diff --git a/doc/build/changelog/unreleased_14/5549.rst b/doc/build/changelog/unreleased_14/5549.rst
new file mode 100644 (file)
index 0000000..a555b82
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: usecase, postgresql
+    :tickets: 5549
+
+    Added support for PostgreSQL "readonly" and "deferrable" flags for all of
+    psycopg2, asyncpg and pg8000 dialects.   This takes advantage of a newly
+    generalized version of the "isolation level" API to support other kinds of
+    session attributes set via execution options that are reliably reset
+    when connections are returned to the connection pool.
+
+    .. seealso::
+
+        :ref:`postgresql_readonly_deferrable`
\ No newline at end of file
index 515ef6e288855b78485eebbc2028e723a0657b91..eb87249b40ad7adc10844386cf02bb1ddebe6539 100644 (file)
@@ -465,6 +465,8 @@ class AsyncAdapt_asyncpg_connection:
         "dbapi",
         "_connection",
         "isolation_level",
+        "readonly",
+        "deferrable",
         "_transaction",
         "_started",
     )
@@ -475,6 +477,8 @@ class AsyncAdapt_asyncpg_connection:
         self.dbapi = dbapi
         self._connection = connection
         self.isolation_level = "read_committed"
+        self.readonly = False
+        self.deferrable = False
         self._transaction = None
         self._started = False
         self.await_(self._setup_type_codecs())
@@ -530,7 +534,9 @@ class AsyncAdapt_asyncpg_connection:
 
         try:
             self._transaction = self._connection.transaction(
-                isolation=self.isolation_level
+                isolation=self.isolation_level,
+                readonly=self.readonly,
+                deferrable=self.deferrable,
             )
             await self._transaction.start()
         except Exception as error:
@@ -763,6 +769,18 @@ class PGDialect_asyncpg(PGDialect):
 
         connection.set_isolation_level(level)
 
+    def set_readonly(self, connection, value):
+        connection.readonly = value
+
+    def get_readonly(self, connection):
+        return connection.readonly
+
+    def set_deferrable(self, connection, value):
+        connection.deferrable = value
+
+    def get_deferrable(self, connection):
+        return connection.deferrable
+
     def create_connect_args(self, url):
         opts = url.translate_connect_args(username="user")
         if "port" in opts:
index 8eb116111d826372f00bd66d0c79a339d56680d3..6550dd20d73e662540ca54a33a595fe4b69bd3c5 100644 (file)
@@ -152,12 +152,44 @@ Valid values for ``isolation_level`` include:
 
 .. seealso::
 
+    :ref:`postgresql_readonly_deferrable`
+
     :ref:`dbapi_autocommit`
 
     :ref:`psycopg2_isolation_level`
 
     :ref:`pg8000_isolation_level`
 
+.. _postgresql_readonly_deferrable:
+
+Setting READ ONLY / DEFERRABLE
+------------------------------
+
+Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
+characteristics of the transaction, which is in addition to the isolation level
+setting. These two attributes can be established either in conjunction with or
+independently of the isolation level by passing the ``postgresql_readonly`` and
+``postgresql_deferrable`` flags with
+:meth:`_engine.Connection.execution_options`.  The example below illustrates
+passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
+"READ ONLY" and "DEFERRABLE"::
+
+    with engine.connect() as conn:
+        conn = conn.execution_options(
+            isolation_level="SERIALIZABLE",
+            postgresql_readonly=True,
+            postgresql_deferrable=True
+        )
+        with conn.begin():
+            #  ... work with transaction
+
+Note that some DBAPIs such as asyncpg only support "readonly" with
+SERIALIZABLE isolation.
+
+.. versionadded:: 1.4 added support for the ``postgresql_readonly``
+   and ``postgresql_deferrable`` execution options.
+
+
 .. _postgresql_schema_reflection:
 
 Remote-Schema Table Introspection and PostgreSQL search_path
@@ -1037,6 +1069,7 @@ from ... import exc
 from ... import schema
 from ... import sql
 from ... import util
+from ...engine import characteristics
 from ...engine import default
 from ...engine import reflection
 from ...sql import coercions
@@ -2610,6 +2643,36 @@ class PGExecutionContext(default.DefaultExecutionContext):
         return AUTOCOMMIT_REGEXP.match(statement)
 
 
+class PGReadOnlyConnectionCharacteristic(
+    characteristics.ConnectionCharacteristic
+):
+    transactional = True
+
+    def reset_characteristic(self, dialect, dbapi_conn):
+        dialect.set_readonly(dbapi_conn, False)
+
+    def set_characteristic(self, dialect, dbapi_conn, value):
+        dialect.set_readonly(dbapi_conn, value)
+
+    def get_characteristic(self, dialect, dbapi_conn):
+        return dialect.get_readonly(dbapi_conn)
+
+
+class PGDeferrableConnectionCharacteristic(
+    characteristics.ConnectionCharacteristic
+):
+    transactional = True
+
+    def reset_characteristic(self, dialect, dbapi_conn):
+        dialect.set_deferrable(dbapi_conn, False)
+
+    def set_characteristic(self, dialect, dbapi_conn, value):
+        dialect.set_deferrable(dbapi_conn, value)
+
+    def get_characteristic(self, dialect, dbapi_conn):
+        return dialect.get_deferrable(dbapi_conn)
+
+
 class PGDialect(default.DefaultDialect):
     name = "postgresql"
     supports_alter = True
@@ -2645,6 +2708,16 @@ class PGDialect(default.DefaultDialect):
     implicit_returning = True
     full_returning = True
 
+    connection_characteristics = (
+        default.DefaultDialect.connection_characteristics
+    )
+    connection_characteristics = connection_characteristics.union(
+        {
+            "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
+            "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
+        }
+    )
+
     construct_arguments = [
         (
             schema.Index,
@@ -2762,6 +2835,18 @@ class PGDialect(default.DefaultDialect):
         cursor.close()
         return val.upper()
 
+    def set_readonly(self, connection, value):
+        raise NotImplementedError()
+
+    def get_readonly(self, connection):
+        raise NotImplementedError()
+
+    def set_deferrable(self, connection, value):
+        raise NotImplementedError()
+
+    def get_deferrable(self, connection):
+        raise NotImplementedError()
+
     def do_begin_twophase(self, connection, xid):
         self.do_begin(connection.connection)
 
index e08332a570a5da1afa5c48f458727137ba4f1f02..fd70828ff8ab97c421bb6444ce10de3c69c050b6 100644 (file)
@@ -359,6 +359,48 @@ class PGDialect_pg8000(PGDialect):
                 % (level, self.name, ", ".join(self._isolation_lookup))
             )
 
+    def set_readonly(self, connection, value):
+        cursor = connection.cursor()
+        try:
+            cursor.execute(
+                "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+                % ("READ ONLY" if value else "READ WRITE")
+            )
+            cursor.execute("COMMIT")
+        finally:
+            cursor.close()
+
+    def get_readonly(self, connection):
+        cursor = connection.cursor()
+        try:
+            cursor.execute("show transaction_read_only")
+            val = cursor.fetchone()[0]
+        finally:
+            cursor.close()
+
+        return val == "yes"
+
+    def set_deferrable(self, connection, value):
+        cursor = connection.cursor()
+        try:
+            cursor.execute(
+                "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+                % ("DEFERRABLE" if value else "NOT DEFERRABLE")
+            )
+            cursor.execute("COMMIT")
+        finally:
+            cursor.close()
+
+    def get_deferrable(self, connection):
+        cursor = connection.cursor()
+        try:
+            cursor.execute("show transaction_deferrable")
+            val = cursor.fetchone()[0]
+        finally:
+            cursor.close()
+
+        return val == "yes"
+
     def set_client_encoding(self, connection, client_encoding):
         # adjust for ConnectionFairy possibly being present
         if hasattr(connection, "connection"):
index 2161b24fc866394058dc6753a0cd97e5bc898d9e..3cc62fc93a8e99be1c66b2192f5e4bf1ac5d7fb6 100644 (file)
@@ -803,6 +803,18 @@ class PGDialect_psycopg2(PGDialect):
 
         connection.set_isolation_level(level)
 
+    def set_readonly(self, connection, value):
+        connection.readonly = value
+
+    def get_readonly(self, connection):
+        return connection.readonly
+
+    def set_deferrable(self, connection, value):
+        connection.deferrable = value
+
+    def get_deferrable(self, connection):
+        return connection.deferrable
+
     def on_connect(self):
         extras = self._psycopg2_extras()
         extensions = self._psycopg2_extensions()
diff --git a/lib/sqlalchemy/engine/characteristics.py b/lib/sqlalchemy/engine/characteristics.py
new file mode 100644 (file)
index 0000000..c00bff4
--- /dev/null
@@ -0,0 +1,56 @@
+import abc
+
+from ..util import ABC
+
+
+class ConnectionCharacteristic(ABC):
+    """An abstract base for an object that can set, get and reset a
+    per-connection characteristic, typically one that gets reset when the
+    connection is returned to the connection pool.
+
+    transaction isolation is the canonical example, and the
+    ``IsolationLevelCharacteristic`` implementation provides this for the
+    ``DefaultDialect``.
+
+    The ``ConnectionCharacteristic`` class should call upon the ``Dialect`` for
+    the implementation of each method.   The object exists strictly to serve as
+    a dialect visitor that can be placed into the
+    ``DefaultDialect.connection_characteristics`` dictionary where it will take
+    effect for calls to :meth:`_engine.Connection.execution_options` and
+    related APIs.
+
+    .. versionadded:: 1.4
+
+    """
+
+    __slots__ = ()
+
+    transactional = False
+
+    @abc.abstractmethod
+    def reset_characteristic(self, dialect, dbapi_conn):
+        """Reset the characteristic on the connection to its default value."""
+
+    @abc.abstractmethod
+    def set_characteristic(self, dialect, dbapi_conn, value):
+        """set characteristic on the connection to a given value."""
+
+    @abc.abstractmethod
+    def get_characteristic(self, dialect, dbapi_conn):
+        """Given a DBAPI connection, get the current value of the
+        characteristic.
+
+        """
+
+
+class IsolationLevelCharacteristic(ConnectionCharacteristic):
+    transactional = True
+
+    def reset_characteristic(self, dialect, dbapi_conn):
+        dialect.reset_isolation_level(dbapi_conn)
+
+    def set_characteristic(self, dialect, dbapi_conn, value):
+        dialect.set_isolation_level(dbapi_conn, value)
+
+    def get_characteristic(self, dialect, dbapi_conn):
+        return dialect.get_isolation_level(dbapi_conn)
index 564258a288fc04b811bcac3f8c5dadaff22a3c6b..0bd4cd14c7771cae00f538a3b0e20bff5dddc4b3 100644 (file)
@@ -14,10 +14,12 @@ as the base class for their own corresponding classes.
 """
 
 import codecs
+import functools
 import random
 import re
 import weakref
 
+from . import characteristics
 from . import cursor as _cursor
 from . import interfaces
 from .. import event
@@ -85,6 +87,10 @@ class DefaultDialect(interfaces.Dialect):
 
     tuple_in_values = False
 
+    connection_characteristics = util.immutabledict(
+        {"isolation_level": characteristics.IsolationLevelCharacteristic()}
+    )
+
     engine_config_types = util.immutabledict(
         [
             ("convert_unicode", util.bool_or_str("force")),
@@ -513,38 +519,76 @@ class DefaultDialect(interfaces.Dialect):
         return [[], opts]
 
     def set_engine_execution_options(self, engine, opts):
-        if "isolation_level" in opts:
-            isolation_level = opts["isolation_level"]
+        supported_names = set(self.connection_characteristics).intersection(
+            opts
+        )
+        if supported_names:
+            characteristics = util.immutabledict(
+                (name, opts[name]) for name in supported_names
+            )
 
             @event.listens_for(engine, "engine_connect")
-            def set_isolation(connection, branch):
+            def set_connection_characteristics(connection, branch):
                 if not branch:
-                    self._set_connection_isolation(connection, isolation_level)
+                    self._set_connection_characteristics(
+                        connection, characteristics
+                    )
 
     def set_connection_execution_options(self, connection, opts):
-        if "isolation_level" in opts:
-            self._set_connection_isolation(connection, opts["isolation_level"])
+        supported_names = set(self.connection_characteristics).intersection(
+            opts
+        )
+        if supported_names:
+            characteristics = util.immutabledict(
+                (name, opts[name]) for name in supported_names
+            )
+            self._set_connection_characteristics(connection, characteristics)
+
+    def _set_connection_characteristics(self, connection, characteristics):
+
+        characteristic_values = [
+            (name, self.connection_characteristics[name], value)
+            for name, value in characteristics.items()
+        ]
 
-    def _set_connection_isolation(self, connection, level):
         if connection.in_transaction():
-            if connection._is_future:
-                raise exc.InvalidRequestError(
-                    "This connection has already begun a transaction; "
-                    "isolation level may not be altered until transaction end"
-                )
-            else:
-                util.warn(
-                    "Connection is already established with a Transaction; "
-                    "setting isolation_level may implicitly rollback or "
-                    "commit "
-                    "the existing transaction, or have no effect until "
-                    "next transaction"
-                )
-        self.set_isolation_level(connection.connection, level)
+            trans_objs = [
+                (name, obj)
+                for name, obj, value in characteristic_values
+                if obj.transactional
+            ]
+            if trans_objs:
+                if connection._is_future:
+                    raise exc.InvalidRequestError(
+                        "This connection has already begun a transaction; "
+                        "%s may not be altered until transaction end"
+                        % (", ".join(name for name, obj in trans_objs))
+                    )
+                else:
+                    util.warn(
+                        "Connection is already established with a "
+                        "Transaction; "
+                        "setting %s may implicitly rollback or "
+                        "commit "
+                        "the existing transaction, or have no effect until "
+                        "next transaction"
+                        % (", ".join(name for name, obj in trans_objs))
+                    )
+
+        dbapi_connection = connection.connection.connection
+        for name, characteristic, value in characteristic_values:
+            characteristic.set_characteristic(self, dbapi_connection, value)
         connection.connection._connection_record.finalize_callback.append(
-            self.reset_isolation_level
+            functools.partial(self._reset_characteristics, characteristics)
         )
 
+    def _reset_characteristics(self, characteristics, dbapi_connection):
+        for characteristic_name in characteristics:
+            characteristic = self.connection_characteristics[
+                characteristic_name
+            ]
+            characteristic.reset_characteristic(self, dbapi_connection)
+
     def do_begin(self, dbapi_connection):
         pass
 
index 1e3eb9a29e5d5c8521ce5fb842190ae1b70ddcb9..1d92084cce41cb8a9c96d662d21bdafce8704201 100644 (file)
@@ -44,6 +44,7 @@ from ._collections import UniqueAppender  # noqa
 from ._collections import update_copy  # noqa
 from ._collections import WeakPopulateDict  # noqa
 from ._collections import WeakSequence  # noqa
+from .compat import ABC  # noqa
 from .compat import arm  # noqa
 from .compat import b  # noqa
 from .compat import b64decode  # noqa
index caa97f72b0931c4fcbbb24dc06f85fe84be7deb3..e1d0e644443b8d9d036aabd7bbb36f6129a37b97 100644 (file)
@@ -193,6 +193,9 @@ if py3k:
 
     # Unused. Kept for backwards compatibility.
     callable = callable  # noqa
+
+    from abc import ABC
+
 else:
     import base64
     import ConfigParser as configparser  # noqa
@@ -208,6 +211,11 @@ else:
     from urllib import unquote_plus  # noqa
     from urlparse import parse_qsl  # noqa
 
+    from abc import ABCMeta
+
+    class ABC(object):
+        __metaclass__ = ABCMeta
+
     try:
         import cPickle as pickle
     except ImportError:
index 6eaa3295b9aa09882ea03f3d86083c6ce665742b..83bc6cac8d17f64b4de57c68dcc08587e8c7fb18 100644 (file)
@@ -735,6 +735,112 @@ class MiscBackendTest(
             ".".join(str(x) for x in v)
         )
 
+    def test_readonly_flag_connection(self):
+        with testing.db.connect() as conn:
+            # asyncpg requires serializable for readonly..
+            conn = conn.execution_options(
+                isolation_level="SERIALIZABLE", postgresql_readonly=True
+            )
+
+            dbapi_conn = conn.connection.connection
+
+            cursor = dbapi_conn.cursor()
+            cursor.execute("show transaction_read_only")
+            val = cursor.fetchone()[0]
+            cursor.close()
+            eq_(val, "on")
+
+        cursor = dbapi_conn.cursor()
+        try:
+            cursor.execute("show transaction_read_only")
+            val = cursor.fetchone()[0]
+        finally:
+            cursor.close()
+            dbapi_conn.rollback()
+        eq_(val, "off")
+
+    def test_deferrable_flag_connection(self):
+        with testing.db.connect() as conn:
+            # asyncpg but not for deferrable?  which the PG docs actually
+            # state.  weird
+            conn = conn.execution_options(
+                isolation_level="SERIALIZABLE", postgresql_deferrable=True
+            )
+
+            dbapi_conn = conn.connection.connection
+
+            cursor = dbapi_conn.cursor()
+            cursor.execute("show transaction_deferrable")
+            val = cursor.fetchone()[0]
+            cursor.close()
+            eq_(val, "on")
+
+        cursor = dbapi_conn.cursor()
+        try:
+            cursor.execute("show transaction_deferrable")
+            val = cursor.fetchone()[0]
+        finally:
+            cursor.close()
+            dbapi_conn.rollback()
+        eq_(val, "off")
+
+    def test_readonly_flag_engine(self):
+        engine = engines.testing_engine(
+            options={
+                "execution_options": dict(
+                    isolation_level="SERIALIZABLE", postgresql_readonly=True
+                )
+            }
+        )
+        for i in range(2):
+            with engine.connect() as conn:
+                dbapi_conn = conn.connection.connection
+
+                cursor = dbapi_conn.cursor()
+                cursor.execute("show transaction_read_only")
+                val = cursor.fetchone()[0]
+                cursor.close()
+                eq_(val, "on")
+
+            cursor = dbapi_conn.cursor()
+            try:
+                cursor.execute("show transaction_read_only")
+                val = cursor.fetchone()[0]
+            finally:
+                cursor.close()
+                dbapi_conn.rollback()
+            eq_(val, "off")
+
+    def test_deferrable_flag_engine(self):
+        engine = engines.testing_engine(
+            options={
+                "execution_options": dict(
+                    isolation_level="SERIALIZABLE", postgresql_deferrable=True
+                )
+            }
+        )
+
+        for i in range(2):
+            with engine.connect() as conn:
+                # asyncpg but not for deferrable?  which the PG docs actually
+                # state.  weird
+                dbapi_conn = conn.connection.connection
+
+                cursor = dbapi_conn.cursor()
+                cursor.execute("show transaction_deferrable")
+                val = cursor.fetchone()[0]
+                cursor.close()
+                eq_(val, "on")
+
+            cursor = dbapi_conn.cursor()
+            try:
+                cursor.execute("show transaction_deferrable")
+                val = cursor.fetchone()[0]
+            finally:
+                cursor.close()
+                dbapi_conn.rollback()
+            eq_(val, "off")
+
     @testing.requires.psycopg2_compatibility
     def test_psycopg2_non_standard_err(self):
         # note that psycopg2 is sometimes called psycopg2cffi
index cd144e45f49f5a7a11326463ed78168fbdb13e33..26ccfdfd31fede794edeb06988e7341721d60194 100644 (file)
@@ -7,12 +7,17 @@ from sqlalchemy import func
 from sqlalchemy import INT
 from sqlalchemy import Integer
 from sqlalchemy import MetaData
+from sqlalchemy import pool as _pool
 from sqlalchemy import select
 from sqlalchemy import String
 from sqlalchemy import testing
 from sqlalchemy import text
 from sqlalchemy import util
 from sqlalchemy import VARCHAR
+from sqlalchemy.engine import base
+from sqlalchemy.engine import characteristics
+from sqlalchemy.engine import default
+from sqlalchemy.engine import url
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import eq_
@@ -1374,6 +1379,150 @@ class IsolationLevelTest(fixtures.TestBase):
             eq_(c2.get_isolation_level(), self._non_default_isolation_level())
 
 
+class ConnectionCharacteristicTest(fixtures.TestBase):
+    @testing.fixture
+    def characteristic_fixture(self):
+        class FooCharacteristic(characteristics.ConnectionCharacteristic):
+            transactional = True
+
+            def reset_characteristic(self, dialect, dbapi_conn):
+
+                dialect.reset_foo(dbapi_conn)
+
+            def set_characteristic(self, dialect, dbapi_conn, value):
+
+                dialect.set_foo(dbapi_conn, value)
+
+            def get_characteristic(self, dialect, dbapi_conn):
+                return dialect.get_foo(dbapi_conn)
+
+        class FooDialect(default.DefaultDialect):
+            connection_characteristics = util.immutabledict(
+                {"foo": FooCharacteristic()}
+            )
+
+            def reset_foo(self, dbapi_conn):
+                dbapi_conn.foo = "original_value"
+
+            def set_foo(self, dbapi_conn, value):
+                dbapi_conn.foo = value
+
+            def get_foo(self, dbapi_conn):
+                return dbapi_conn.foo
+
+        connection = mock.Mock()
+
+        def creator():
+            connection.foo = "original_value"
+            return connection
+
+        pool = _pool.SingletonThreadPool(creator=creator)
+        u = url.make_url("foo://")
+        return base.Engine(pool, FooDialect(), u), connection
+
+    def test_engine_param_stays(self, characteristic_fixture):
+
+        engine, connection = characteristic_fixture
+
+        foo_level = engine.dialect.get_foo(engine.connect().connection)
+
+        new_level = "new_level"
+
+        ne_(foo_level, new_level)
+
+        eng = engine.execution_options(foo=new_level)
+        eq_(eng.dialect.get_foo(eng.connect().connection), new_level)
+
+        # check that it stays
+        conn = eng.connect()
+        eq_(eng.dialect.get_foo(conn.connection), new_level)
+        conn.close()
+
+        conn = eng.connect()
+        eq_(eng.dialect.get_foo(conn.connection), new_level)
+        conn.close()
+
+    def test_default_level(self, characteristic_fixture):
+        engine, connection = characteristic_fixture
+
+        eq_(
+            engine.dialect.get_foo(engine.connect().connection),
+            "original_value",
+        )
+
+    def test_connection_invalidated(self, characteristic_fixture):
+        engine, connection = characteristic_fixture
+
+        conn = engine.connect()
+        c2 = conn.execution_options(foo="new_value")
+        eq_(connection.foo, "new_value")
+        c2.invalidate()
+        c2.connection
+
+        eq_(connection.foo, "original_value")
+
+    def test_warning_in_transaction(self, characteristic_fixture):
+        engine, connection = characteristic_fixture
+
+        c1 = engine.connect()
+        with expect_warnings(
+            "Connection is already established with a Transaction; "
+            "setting foo may implicitly rollback or commit "
+            "the existing transaction, or have no effect until next "
+            "transaction"
+        ):
+            with c1.begin():
+                c1 = c1.execution_options(foo="new_foo")
+
+                eq_(
+                    engine.dialect.get_foo(c1.connection), "new_foo",
+                )
+        # stays outside of transaction
+        eq_(engine.dialect.get_foo(c1.connection), "new_foo")
+
+    @testing.fails("no error is raised yet here.")
+    def test_per_statement_bzzt(self, characteristic_fixture):
+        engine, connection = characteristic_fixture
+
+        # this would need some on-execute mechanism to look inside of
+        # the characteristics list.   unfortunately this would
+        # add some latency.
+
+        assert_raises_message(
+            exc.ArgumentError,
+            r"'foo' execution option may only be specified "
+            r"on Connection.execution_options\(\), or "
+            r"per-engine using the isolation_level "
+            r"argument to create_engine\(\).",
+            connection.execute,
+            select([1]).execution_options(foo="bar"),
+        )
+
+    def test_per_engine(self, characteristic_fixture):
+
+        engine, connection = characteristic_fixture
+
+        pool, dialect, url = engine.pool, engine.dialect, engine.url
+
+        eng = base.Engine(
+            pool, dialect, url, execution_options={"foo": "new_value"}
+        )
+
+        conn = eng.connect()
+        eq_(eng.dialect.get_foo(conn.connection), "new_value")
+
+    def test_per_option_engine(self, characteristic_fixture):
+
+        engine, connection = characteristic_fixture
+
+        eng = engine.execution_options(foo="new_value")
+
+        conn = eng.connect()
+        eq_(
+            eng.dialect.get_foo(conn.connection), "new_value",
+        )
+
+
 class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
     """Still some debate over if the "reset agent" should apply to the
     future connection or not.
@@ -1586,7 +1735,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
             assert_raises_message(
                 exc.InvalidRequestError,
                 "This connection has already begun a transaction; "
-                "isolation level may not be altered until transaction end",
+                "isolation_level may not be altered until transaction end",
                 conn.execution_options,
                 isolation_level="AUTOCOMMIT",
             )
@@ -1600,7 +1749,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
             assert_raises_message(
                 exc.InvalidRequestError,
                 "This connection has already begun a transaction; "
-                "isolation level may not be altered until transaction end",
+                "isolation_level may not be altered until transaction end",
                 conn.execution_options,
                 isolation_level="AUTOCOMMIT",
             )