The connection object returned by :meth:`_engine.Engine.raw_connection`
now supports the context manager protocol, automatically returning the
connection to the pool when exiting the context.
Fixes: #13116
Change-Id: I51eb1fd61b772368f12a787e5f60db153a839e70
--- /dev/null
+.. change::
+ :tags: usecase, engine
+ :tickets: 13116
+
+ The connection object returned by :meth:`_engine.Engine.raw_connection`
+ now supports the context manager protocol, automatically returning the
+ connection to the pool when exiting the context.
from .. import exc
from .. import log
from .. import util
from .. import exc
from .. import log
from .. import util
+from ..util.typing import Self
if TYPE_CHECKING:
from ..engine.interfaces import DBAPIConnection
if TYPE_CHECKING:
from ..engine.interfaces import DBAPIConnection
"""
raise NotImplementedError()
"""
raise NotImplementedError()
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
+ self.close()
+ return None
+
class _AdhocProxiedConnection(PoolProxiedConnection):
"""provides the :class:`.PoolProxiedConnection` interface for cases where
class _AdhocProxiedConnection(PoolProxiedConnection):
"""provides the :class:`.PoolProxiedConnection` interface for cases where
from sqlalchemy.testing import is_not_none
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import is_not_none
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
assert not c2.info
assert "foo2" in c.info
assert not c2.info
assert "foo2" in c.info
+ def test_raw_connection_context(self):
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
+ e = create_engine("sqlite://", pool=p, _initialize=False)
+
+ eq_(p.checkedout(), 0)
+ conn = e.raw_connection()
+ eq_(p.checkedout(), 1)
+ with conn as wc:
+ eq_(p.checkedout(), 1)
+ is_(conn, wc)
+
+ eq_(p.checkedout(), 0)
+
+ with expect_raises_message(ValueError, "an error"):
+ with e.raw_connection() as wc:
+ eq_(p.checkedout(), 1)
+ raise ValueError("an error")
+ eq_(p.checkedout(), 0)
+
def test_rec_info(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
def test_rec_info(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)