use is not supported and will be removed in a future release.
.. change::
- :tags: orm, use_case
+ :tags: orm, usecase
:tickets: 9297
To accommodate a change in column ordering used by ORM Declarative in
--- /dev/null
+.. change::
+ :tags: usecase, pool
+ :tickets: 9625
+
+ Added :func:`_sa.create_pool_from_url` and
+ :func:`_asyncio.create_async_pool_from_url` to create
+ a :class:`_pool.Pool` instance from an input url passed as string
+ or :class:`_sa.URL`.
.. autofunction:: sqlalchemy.engine.make_url
+.. autofunction:: sqlalchemy.create_pool_from_url
.. autoclass:: sqlalchemy.engine.URL
:members:
active :class:`_orm.Connection` instances; again prefer to create new
:class:`_orm.Session` objects in new processes.
+Using a pool instance directly
+------------------------------
+
+A pool implementation can be used directly without an engine. This could be used
+in applications that just whish to use the pool behavior without all other
+SQLAlchemy features.
+In the example below the default pool for the ``MySQLdb`` dialect is obtained using
+:func:`_sa.create_pool_from_url`::
+
+ from sqlalchemy import create_pool_from_url
+
+ my_pool = create_pool_from_url(
+ "mysql+mysqldb://", max_overflow=5, pool_size=5, pre_ping=True
+ )
+
+ con = my_pool.connect()
+ # use the connection
+ ...
+ # then close it
+ con.close()
+
+If the type of pool to create is not specified, the default one for the dialect
+will be used. To specify it directly the ``poolclass`` argument can be used,
+like in the following example::
+
+ from sqlalchemy import create_pool_from_url
+ from sqlalchemy import NullPool
+
+ my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)
API Documentation - Available Pool Implementations
--------------------------------------------------
.. autofunction:: async_engine_from_config
+.. autofunction:: create_async_pool_from_url
+
.. autoclass:: AsyncEngine
:members:
from .engine import Connection as Connection
from .engine import create_engine as create_engine
from .engine import create_mock_engine as create_mock_engine
+from .engine import create_pool_from_url as create_pool_from_url
from .engine import CreateEnginePlugin as CreateEnginePlugin
from .engine import CursorResult as CursorResult
from .engine import Dialect as Dialect
from .base import Transaction as Transaction
from .base import TwoPhaseTransaction as TwoPhaseTransaction
from .create import create_engine as create_engine
+from .create import create_pool_from_url as create_pool_from_url
from .create import engine_from_config as engine_from_config
from .cursor import CursorResult as CursorResult
from .cursor import ResultProxy as ResultProxy
from ..pool import _AdhocProxiedConnection
from ..pool import ConnectionPoolEntry
from ..sql import compiler
+from ..util import immutabledict
if typing.TYPE_CHECKING:
from .base import Engine
# consume pool arguments from kwargs, translating a few of
# the arguments
- translate = {
- "logging_name": "pool_logging_name",
- "echo": "echo_pool",
- "timeout": "pool_timeout",
- "recycle": "pool_recycle",
- "events": "pool_events",
- "reset_on_return": "pool_reset_on_return",
- "pre_ping": "pool_pre_ping",
- "use_lifo": "pool_use_lifo",
- }
for k in util.get_cls_kwargs(poolclass):
- tk = translate.get(k, k)
+ tk = _pool_translate_kwargs.get(k, k)
if tk in kwargs:
pool_args[k] = pop_kwarg(tk)
options.update(kwargs)
url = options.pop("url")
return create_engine(url, **options)
+
+
+@overload
+def create_pool_from_url(
+ url: Union[str, URL],
+ *,
+ poolclass: Optional[Type[Pool]] = ...,
+ logging_name: str = ...,
+ pre_ping: bool = ...,
+ size: int = ...,
+ recycle: int = ...,
+ reset_on_return: Optional[_ResetStyleArgType] = ...,
+ timeout: float = ...,
+ use_lifo: bool = ...,
+ **kwargs: Any,
+) -> Pool:
+ ...
+
+
+@overload
+def create_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
+ ...
+
+
+def create_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
+ """Create a pool instance from the given url.
+
+ If ``poolclass`` is not provided the pool class used
+ is selected using the dialect specified in the URL.
+
+ The arguments passed to :func:`_sa.create_pool_from_url` are
+ identical to the pool argument passed to the :func:`_sa.create_engine`
+ function.
+
+ .. versionadded:: 2.0.10
+ """
+
+ for key in _pool_translate_kwargs:
+ if key in kwargs:
+ kwargs[_pool_translate_kwargs[key]] = kwargs.pop(key)
+
+ engine = create_engine(url, **kwargs, _initialize=False)
+ return engine.pool
+
+
+_pool_translate_kwargs = immutabledict(
+ {
+ "logging_name": "pool_logging_name",
+ "echo": "echo_pool",
+ "timeout": "pool_timeout",
+ "recycle": "pool_recycle",
+ "events": "pool_events", # deprecated
+ "reset_on_return": "pool_reset_on_return",
+ "pre_ping": "pool_pre_ping",
+ "use_lifo": "pool_use_lifo",
+ }
+)
from .engine import AsyncEngine as AsyncEngine
from .engine import AsyncTransaction as AsyncTransaction
from .engine import create_async_engine as create_async_engine
+from .engine import create_async_pool_from_url as create_async_pool_from_url
from .result import AsyncMappingResult as AsyncMappingResult
from .result import AsyncResult as AsyncResult
from .result import AsyncScalarResult as AsyncScalarResult
from ... import util
from ...engine import Connection
from ...engine import create_engine as _create_engine
+from ...engine import create_pool_from_url as _create_pool_from_url
from ...engine import Engine
from ...engine.base import NestedTransaction
from ...engine.base import Transaction
"use the connection.stream() method for an async "
"streaming result set"
)
- kw["future"] = True
kw["_is_async"] = True
sync_engine = _create_engine(url, **kw)
return AsyncEngine(sync_engine)
return create_async_engine(url, **options)
+def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
+ """Create a new async engine instance.
+
+ Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly
+ identical to those passed to the :func:`_sa.create_pool_from_url` function.
+ The specified dialect must be an asyncio-compatible dialect
+ such as :ref:`dialect-postgresql-asyncpg`.
+
+ .. versionadded:: 2.0.10
+
+ """
+ kwargs["_is_async"] = True
+ return _create_pool_from_url(url, **kwargs)
+
+
class AsyncConnectable:
__slots__ = "_slots_dispatch", "__weakref__"
from unittest.mock import call
from unittest.mock import MagicMock
from unittest.mock import Mock
+from unittest.mock import patch
import sqlalchemy as tsa
from sqlalchemy import create_engine
+from sqlalchemy import create_pool_from_url
from sqlalchemy import engine_from_config
from sqlalchemy import exc
from sqlalchemy import pool
from sqlalchemy.dialects import registry
from sqlalchemy.engine.default import DefaultDialect
import sqlalchemy.engine.url as url
+from sqlalchemy.pool.impl import NullPool
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixture
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
ne_(successes, 0, "No default drivers found.")
+class CreatePoolTest(fixtures.TestBase):
+ @fixture
+ def mock_create(self):
+ with patch(
+ "sqlalchemy.engine.create.create_engine",
+ ) as p:
+ yield p
+
+ def test_url_only(self, mock_create):
+ create_pool_from_url("sqlite://")
+ mock_create.assert_called_once_with("sqlite://", _initialize=False)
+
+ def test_pool_args(self, mock_create):
+ create_pool_from_url(
+ "sqlite://",
+ logging_name="foo",
+ echo=True,
+ timeout=42,
+ recycle=22,
+ reset_on_return=True,
+ pre_ping=True,
+ use_lifo=True,
+ foo=99,
+ )
+ mock_create.assert_called_once_with(
+ "sqlite://",
+ pool_logging_name="foo",
+ echo_pool=True,
+ pool_timeout=42,
+ pool_recycle=22,
+ pool_reset_on_return=True,
+ pool_pre_ping=True,
+ pool_use_lifo=True,
+ foo=99,
+ _initialize=False,
+ )
+
+ def test_pool_creation(self):
+ pp = create_pool_from_url("sqlite://")
+ engine_pool = create_engine("sqlite://").pool
+ eq_(pp.__class__, engine_pool.__class__)
+ pp = create_pool_from_url("sqlite://", pre_ping=True)
+ is_true(pp._pre_ping)
+ is_false(isinstance(pp, NullPool))
+
+ def test_pool_creation_custom_class(self):
+ pp = create_pool_from_url("sqlite://", poolclass=NullPool)
+ is_true(isinstance(pp, NullPool))
+
+
class TestRegNewDBAPI(fixtures.TestBase):
def test_register_base(self):
registry.register("mockdialect", __name__, "MockDialect")
import asyncio
import inspect as stdlib_inspect
+from unittest.mock import patch
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy.ext.asyncio import create_async_engine
+from sqlalchemy.ext.asyncio import create_async_pool_from_url
from sqlalchemy.ext.asyncio import engine as _async_engine
from sqlalchemy.ext.asyncio import exc as async_exc
from sqlalchemy.ext.asyncio import exc as asyncio_exc
assert engine.dialect.is_async is True
+class AsyncCreatePoolTest(fixtures.TestBase):
+ @config.fixture
+ def mock_create(self):
+ with patch(
+ "sqlalchemy.ext.asyncio.engine._create_pool_from_url",
+ ) as p:
+ yield p
+
+ def test_url_only(self, mock_create):
+ create_async_pool_from_url("sqlite://")
+ mock_create.assert_called_once_with("sqlite://", _is_async=True)
+
+ def test_pool_args(self, mock_create):
+ create_async_pool_from_url("sqlite://", foo=99, echo=True)
+ mock_create.assert_called_once_with(
+ "sqlite://", foo=99, echo=True, _is_async=True
+ )
+
+
class AsyncEventTest(EngineFixture):
"""The engine events all run in their normal synchronous context.