--- /dev/null
+.. change::
+ :tags: usecase, postgresql
+ :tickets: 5549
+
+ Added support for PostgreSQL "readonly" and "deferrable" flags for all of
+ psycopg2, asyncpg and pg8000 dialects. This takes advantage of a newly
+ generalized version of the "isolation level" API to support other kinds of
+ session attributes set via execution options that are reliably reset
+ when connections are returned to the connection pool.
+
+ .. seealso::
+
+ :ref:`postgresql_readonly_deferrable`
\ No newline at end of file
"dbapi",
"_connection",
"isolation_level",
+ "readonly",
+ "deferrable",
"_transaction",
"_started",
)
self.dbapi = dbapi
self._connection = connection
self.isolation_level = "read_committed"
+ self.readonly = False
+ self.deferrable = False
self._transaction = None
self._started = False
self.await_(self._setup_type_codecs())
try:
self._transaction = self._connection.transaction(
- isolation=self.isolation_level
+ isolation=self.isolation_level,
+ readonly=self.readonly,
+ deferrable=self.deferrable,
)
await self._transaction.start()
except Exception as error:
connection.set_isolation_level(level)
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
if "port" in opts:
.. seealso::
+ :ref:`postgresql_readonly_deferrable`
+
:ref:`dbapi_autocommit`
:ref:`psycopg2_isolation_level`
:ref:`pg8000_isolation_level`
+.. _postgresql_readonly_deferrable:
+
+Setting READ ONLY / DEFERRABLE
+------------------------------
+
+Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
+characteristics of the transaction, which is in addition to the isolation level
+setting. These two attributes can be established either in conjunction with or
+independently of the isolation level by passing the ``postgresql_readonly`` and
+``postgresql_deferrable`` flags with
+:meth:`_engine.Connection.execution_options`. The example below illustrates
+passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
+"READ ONLY" and "DEFERRABLE"::
+
+ with engine.connect() as conn:
+ conn = conn.execution_options(
+ isolation_level="SERIALIZABLE",
+ postgresql_readonly=True,
+ postgresql_deferrable=True
+ )
+ with conn.begin():
+ # ... work with transaction
+
+Note that some DBAPIs such as asyncpg only support "readonly" with
+SERIALIZABLE isolation.
+
+.. versionadded:: 1.4 added support for the ``postgresql_readonly``
+ and ``postgresql_deferrable`` execution options.
+
+
.. _postgresql_schema_reflection:
Remote-Schema Table Introspection and PostgreSQL search_path
from ... import schema
from ... import sql
from ... import util
+from ...engine import characteristics
from ...engine import default
from ...engine import reflection
from ...sql import coercions
return AUTOCOMMIT_REGEXP.match(statement)
+class PGReadOnlyConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_readonly(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_readonly(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_readonly(dbapi_conn)
+
+
+class PGDeferrableConnectionCharacteristic(
+ characteristics.ConnectionCharacteristic
+):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.set_deferrable(dbapi_conn, False)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_deferrable(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_deferrable(dbapi_conn)
+
+
class PGDialect(default.DefaultDialect):
name = "postgresql"
supports_alter = True
implicit_returning = True
full_returning = True
+ connection_characteristics = (
+ default.DefaultDialect.connection_characteristics
+ )
+ connection_characteristics = connection_characteristics.union(
+ {
+ "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
+ "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
+ }
+ )
+
construct_arguments = [
(
schema.Index,
cursor.close()
return val.upper()
+ def set_readonly(self, connection, value):
+ raise NotImplementedError()
+
+ def get_readonly(self, connection):
+ raise NotImplementedError()
+
+ def set_deferrable(self, connection, value):
+ raise NotImplementedError()
+
+ def get_deferrable(self, connection):
+ raise NotImplementedError()
+
def do_begin_twophase(self, connection, xid):
self.do_begin(connection.connection)
% (level, self.name, ", ".join(self._isolation_lookup))
)
+ def set_readonly(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("READ ONLY" if value else "READ WRITE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_readonly(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "yes"
+
+ def set_deferrable(self, connection, value):
+ cursor = connection.cursor()
+ try:
+ cursor.execute(
+ "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
+ % ("DEFERRABLE" if value else "NOT DEFERRABLE")
+ )
+ cursor.execute("COMMIT")
+ finally:
+ cursor.close()
+
+ def get_deferrable(self, connection):
+ cursor = connection.cursor()
+ try:
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+
+ return val == "yes"
+
def set_client_encoding(self, connection, client_encoding):
# adjust for ConnectionFairy possibly being present
if hasattr(connection, "connection"):
connection.set_isolation_level(level)
+ def set_readonly(self, connection, value):
+ connection.readonly = value
+
+ def get_readonly(self, connection):
+ return connection.readonly
+
+ def set_deferrable(self, connection, value):
+ connection.deferrable = value
+
+ def get_deferrable(self, connection):
+ return connection.deferrable
+
def on_connect(self):
extras = self._psycopg2_extras()
extensions = self._psycopg2_extensions()
--- /dev/null
+import abc
+
+from ..util import ABC
+
+
+class ConnectionCharacteristic(ABC):
+ """An abstract base for an object that can set, get and reset a
+ per-connection characteristic, typically one that gets reset when the
+ connection is returned to the connection pool.
+
+ transaction isolation is the canonical example, and the
+ ``IsolationLevelCharacteristic`` implementation provides this for the
+ ``DefaultDialect``.
+
+ The ``ConnectionCharacteristic`` class should call upon the ``Dialect`` for
+ the implementation of each method. The object exists strictly to serve as
+ a dialect visitor that can be placed into the
+ ``DefaultDialect.connection_characteristics`` dictionary where it will take
+ effect for calls to :meth:`_engine.Connection.execution_options` and
+ related APIs.
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ()
+
+ transactional = False
+
+ @abc.abstractmethod
+ def reset_characteristic(self, dialect, dbapi_conn):
+ """Reset the characteristic on the connection to its default value."""
+
+ @abc.abstractmethod
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ """set characteristic on the connection to a given value."""
+
+ @abc.abstractmethod
+ def get_characteristic(self, dialect, dbapi_conn):
+ """Given a DBAPI connection, get the current value of the
+ characteristic.
+
+ """
+
+
+class IsolationLevelCharacteristic(ConnectionCharacteristic):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+ dialect.reset_isolation_level(dbapi_conn)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+ dialect.set_isolation_level(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_isolation_level(dbapi_conn)
"""
import codecs
+import functools
import random
import re
import weakref
+from . import characteristics
from . import cursor as _cursor
from . import interfaces
from .. import event
tuple_in_values = False
+ connection_characteristics = util.immutabledict(
+ {"isolation_level": characteristics.IsolationLevelCharacteristic()}
+ )
+
engine_config_types = util.immutabledict(
[
("convert_unicode", util.bool_or_str("force")),
return [[], opts]
def set_engine_execution_options(self, engine, opts):
- if "isolation_level" in opts:
- isolation_level = opts["isolation_level"]
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
@event.listens_for(engine, "engine_connect")
- def set_isolation(connection, branch):
+ def set_connection_characteristics(connection, branch):
if not branch:
- self._set_connection_isolation(connection, isolation_level)
+ self._set_connection_characteristics(
+ connection, characteristics
+ )
def set_connection_execution_options(self, connection, opts):
- if "isolation_level" in opts:
- self._set_connection_isolation(connection, opts["isolation_level"])
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
+ self._set_connection_characteristics(connection, characteristics)
+
+ def _set_connection_characteristics(self, connection, characteristics):
+
+ characteristic_values = [
+ (name, self.connection_characteristics[name], value)
+ for name, value in characteristics.items()
+ ]
- def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
- if connection._is_future:
- raise exc.InvalidRequestError(
- "This connection has already begun a transaction; "
- "isolation level may not be altered until transaction end"
- )
- else:
- util.warn(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or "
- "commit "
- "the existing transaction, or have no effect until "
- "next transaction"
- )
- self.set_isolation_level(connection.connection, level)
+ trans_objs = [
+ (name, obj)
+ for name, obj, value in characteristic_values
+ if obj.transactional
+ ]
+ if trans_objs:
+ if connection._is_future:
+ raise exc.InvalidRequestError(
+ "This connection has already begun a transaction; "
+ "%s may not be altered until transaction end"
+ % (", ".join(name for name, obj in trans_objs))
+ )
+ else:
+ util.warn(
+ "Connection is already established with a "
+ "Transaction; "
+ "setting %s may implicitly rollback or "
+ "commit "
+ "the existing transaction, or have no effect until "
+ "next transaction"
+ % (", ".join(name for name, obj in trans_objs))
+ )
+
+ dbapi_connection = connection.connection.connection
+ for name, characteristic, value in characteristic_values:
+ characteristic.set_characteristic(self, dbapi_connection, value)
connection.connection._connection_record.finalize_callback.append(
- self.reset_isolation_level
+ functools.partial(self._reset_characteristics, characteristics)
)
+ def _reset_characteristics(self, characteristics, dbapi_connection):
+ for characteristic_name in characteristics:
+ characteristic = self.connection_characteristics[
+ characteristic_name
+ ]
+ characteristic.reset_characteristic(self, dbapi_connection)
+
def do_begin(self, dbapi_connection):
pass
from ._collections import update_copy # noqa
from ._collections import WeakPopulateDict # noqa
from ._collections import WeakSequence # noqa
+from .compat import ABC # noqa
from .compat import arm # noqa
from .compat import b # noqa
from .compat import b64decode # noqa
# Unused. Kept for backwards compatibility.
callable = callable # noqa
+
+ from abc import ABC
+
else:
import base64
import ConfigParser as configparser # noqa
from urllib import unquote_plus # noqa
from urlparse import parse_qsl # noqa
+ from abc import ABCMeta
+
+ class ABC(object):
+ __metaclass__ = ABCMeta
+
try:
import cPickle as pickle
except ImportError:
".".join(str(x) for x in v)
)
+ def test_readonly_flag_connection(self):
+ with testing.db.connect() as conn:
+ # asyncpg requires serializable for readonly..
+ conn = conn.execution_options(
+ isolation_level="SERIALIZABLE", postgresql_readonly=True
+ )
+
+ dbapi_conn = conn.connection.connection
+
+ cursor = dbapi_conn.cursor()
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ cursor.close()
+ eq_(val, "on")
+
+ cursor = dbapi_conn.cursor()
+ try:
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+ dbapi_conn.rollback()
+ eq_(val, "off")
+
+ def test_deferrable_flag_connection(self):
+ with testing.db.connect() as conn:
+ # asyncpg but not for deferrable? which the PG docs actually
+ # state. weird
+ conn = conn.execution_options(
+ isolation_level="SERIALIZABLE", postgresql_deferrable=True
+ )
+
+ dbapi_conn = conn.connection.connection
+
+ cursor = dbapi_conn.cursor()
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ cursor.close()
+ eq_(val, "on")
+
+ cursor = dbapi_conn.cursor()
+ try:
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+ dbapi_conn.rollback()
+ eq_(val, "off")
+
+ def test_readonly_flag_engine(self):
+ engine = engines.testing_engine(
+ options={
+ "execution_options": dict(
+ isolation_level="SERIALIZABLE", postgresql_readonly=True
+ )
+ }
+ )
+ for i in range(2):
+ with engine.connect() as conn:
+ dbapi_conn = conn.connection.connection
+
+ cursor = dbapi_conn.cursor()
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ cursor.close()
+ eq_(val, "on")
+
+ cursor = dbapi_conn.cursor()
+ try:
+ cursor.execute("show transaction_read_only")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+ dbapi_conn.rollback()
+ eq_(val, "off")
+
+ def test_deferrable_flag_engine(self):
+ engine = engines.testing_engine(
+ options={
+ "execution_options": dict(
+ isolation_level="SERIALIZABLE", postgresql_deferrable=True
+ )
+ }
+ )
+
+ for i in range(2):
+ with engine.connect() as conn:
+ # asyncpg but not for deferrable? which the PG docs actually
+ # state. weird
+ dbapi_conn = conn.connection.connection
+
+ cursor = dbapi_conn.cursor()
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ cursor.close()
+ eq_(val, "on")
+
+ cursor = dbapi_conn.cursor()
+ try:
+ cursor.execute("show transaction_deferrable")
+ val = cursor.fetchone()[0]
+ finally:
+ cursor.close()
+ dbapi_conn.rollback()
+ eq_(val, "off")
+
@testing.requires.psycopg2_compatibility
def test_psycopg2_non_standard_err(self):
# note that psycopg2 is sometimes called psycopg2cffi
from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import MetaData
+from sqlalchemy import pool as _pool
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import util
from sqlalchemy import VARCHAR
+from sqlalchemy.engine import base
+from sqlalchemy.engine import characteristics
+from sqlalchemy.engine import default
+from sqlalchemy.engine import url
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
eq_(c2.get_isolation_level(), self._non_default_isolation_level())
+class ConnectionCharacteristicTest(fixtures.TestBase):
+ @testing.fixture
+ def characteristic_fixture(self):
+ class FooCharacteristic(characteristics.ConnectionCharacteristic):
+ transactional = True
+
+ def reset_characteristic(self, dialect, dbapi_conn):
+
+ dialect.reset_foo(dbapi_conn)
+
+ def set_characteristic(self, dialect, dbapi_conn, value):
+
+ dialect.set_foo(dbapi_conn, value)
+
+ def get_characteristic(self, dialect, dbapi_conn):
+ return dialect.get_foo(dbapi_conn)
+
+ class FooDialect(default.DefaultDialect):
+ connection_characteristics = util.immutabledict(
+ {"foo": FooCharacteristic()}
+ )
+
+ def reset_foo(self, dbapi_conn):
+ dbapi_conn.foo = "original_value"
+
+ def set_foo(self, dbapi_conn, value):
+ dbapi_conn.foo = value
+
+ def get_foo(self, dbapi_conn):
+ return dbapi_conn.foo
+
+ connection = mock.Mock()
+
+ def creator():
+ connection.foo = "original_value"
+ return connection
+
+ pool = _pool.SingletonThreadPool(creator=creator)
+ u = url.make_url("foo://")
+ return base.Engine(pool, FooDialect(), u), connection
+
+ def test_engine_param_stays(self, characteristic_fixture):
+
+ engine, connection = characteristic_fixture
+
+ foo_level = engine.dialect.get_foo(engine.connect().connection)
+
+ new_level = "new_level"
+
+ ne_(foo_level, new_level)
+
+ eng = engine.execution_options(foo=new_level)
+ eq_(eng.dialect.get_foo(eng.connect().connection), new_level)
+
+ # check that it stays
+ conn = eng.connect()
+ eq_(eng.dialect.get_foo(conn.connection), new_level)
+ conn.close()
+
+ conn = eng.connect()
+ eq_(eng.dialect.get_foo(conn.connection), new_level)
+ conn.close()
+
+ def test_default_level(self, characteristic_fixture):
+ engine, connection = characteristic_fixture
+
+ eq_(
+ engine.dialect.get_foo(engine.connect().connection),
+ "original_value",
+ )
+
+ def test_connection_invalidated(self, characteristic_fixture):
+ engine, connection = characteristic_fixture
+
+ conn = engine.connect()
+ c2 = conn.execution_options(foo="new_value")
+ eq_(connection.foo, "new_value")
+ c2.invalidate()
+ c2.connection
+
+ eq_(connection.foo, "original_value")
+
+ def test_warning_in_transaction(self, characteristic_fixture):
+ engine, connection = characteristic_fixture
+
+ c1 = engine.connect()
+ with expect_warnings(
+ "Connection is already established with a Transaction; "
+ "setting foo may implicitly rollback or commit "
+ "the existing transaction, or have no effect until next "
+ "transaction"
+ ):
+ with c1.begin():
+ c1 = c1.execution_options(foo="new_foo")
+
+ eq_(
+ engine.dialect.get_foo(c1.connection), "new_foo",
+ )
+ # stays outside of transaction
+ eq_(engine.dialect.get_foo(c1.connection), "new_foo")
+
+ @testing.fails("no error is raised yet here.")
+ def test_per_statement_bzzt(self, characteristic_fixture):
+ engine, connection = characteristic_fixture
+
+ # this would need some on-execute mechanism to look inside of
+ # the characteristics list. unfortunately this would
+ # add some latency.
+
+ assert_raises_message(
+ exc.ArgumentError,
+ r"'foo' execution option may only be specified "
+ r"on Connection.execution_options\(\), or "
+ r"per-engine using the isolation_level "
+ r"argument to create_engine\(\).",
+ connection.execute,
+ select([1]).execution_options(foo="bar"),
+ )
+
+ def test_per_engine(self, characteristic_fixture):
+
+ engine, connection = characteristic_fixture
+
+ pool, dialect, url = engine.pool, engine.dialect, engine.url
+
+ eng = base.Engine(
+ pool, dialect, url, execution_options={"foo": "new_value"}
+ )
+
+ conn = eng.connect()
+ eq_(eng.dialect.get_foo(conn.connection), "new_value")
+
+ def test_per_option_engine(self, characteristic_fixture):
+
+ engine, connection = characteristic_fixture
+
+ eng = engine.execution_options(foo="new_value")
+
+ conn = eng.connect()
+ eq_(
+ eng.dialect.get_foo(conn.connection), "new_value",
+ )
+
+
class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
"""Still some debate over if the "reset agent" should apply to the
future connection or not.
assert_raises_message(
exc.InvalidRequestError,
"This connection has already begun a transaction; "
- "isolation level may not be altered until transaction end",
+ "isolation_level may not be altered until transaction end",
conn.execution_options,
isolation_level="AUTOCOMMIT",
)
assert_raises_message(
exc.InvalidRequestError,
"This connection has already begun a transaction; "
- "isolation level may not be altered until transaction end",
+ "isolation_level may not be altered until transaction end",
conn.execution_options,
isolation_level="AUTOCOMMIT",
)