--- /dev/null
+.. change::
+ :tags: usecase, engine
+ :tickets: 13116
+
+ The connection object returned by :meth:`_engine.Engine.raw_connection`
+ now supports the context manager protocol, automatically returning the
+ connection to the pool when exiting the context.
from .. import exc
from .. import log
from .. import util
+from ..util.typing import Self
if TYPE_CHECKING:
from ..engine.interfaces import DBAPIConnection
"""
raise NotImplementedError()
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
+ self.close()
+ return None
+
class _AdhocProxiedConnection(PoolProxiedConnection):
"""provides the :class:`.PoolProxiedConnection` interface for cases where
from sqlalchemy.testing import is_not_none
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
assert not c2.info
assert "foo2" in c.info
+ def test_raw_connection_context(self):
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
+ e = create_engine("sqlite://", pool=p, _initialize=False)
+
+ eq_(p.checkedout(), 0)
+ conn = e.raw_connection()
+ eq_(p.checkedout(), 1)
+ with conn as wc:
+ eq_(p.checkedout(), 1)
+ is_(conn, wc)
+
+ eq_(p.checkedout(), 0)
+
+ with expect_raises_message(ValueError, "an error"):
+ with e.raw_connection() as wc:
+ eq_(p.checkedout(), 1)
+ raise ValueError("an error")
+ eq_(p.checkedout(), 0)
+
def test_rec_info(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)