From: Pavel Sirotkin Date: Thu, 20 Apr 2023 17:40:04 +0000 (-0400) Subject: Add name_func optional attribute for asyncpg adapter X-Git-Tag: rel_2_0_10~3^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=244c29768254d12ff18bb342b154a009080345d6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add name_func optional attribute for asyncpg adapter I faced an issue related to pg bouncer and prepared statement cache flow in asyncpg dialect. Regarding this discussion https://github.com/sqlalchemy/sqlalchemy/issues/6467 I prepared PR to support an optional parameter `name` in prepared statement which is allowed, since 0.25.0 version in `asyncpg` https://github.com/MagicStack/asyncpg/pull/846 **UPD:** the issue with proposal: https://github.com/sqlalchemy/sqlalchemy/issues/9608 ### Description Added optional parameter `name_func` to `AsyncAdapt_asyncpg_connection` class which will call on the `self._connection.prepare()` function and populate a unique name. so in general instead this ```python from uuid import uuid4 from asyncpg import Connection class CConnection(Connection): def _get_unique_id(self, prefix: str) -> str: return f'__asyncpg_{prefix}_{uuid4()}__' engine = create_async_engine(..., connect_args={ 'connection_class': CConnection, }, ) ``` would be enough ```python from uuid import uuid4 engine = create_async_engine(..., connect_args={ 'name_func': lambda: f'__asyncpg_{uuid4()}__', }, ) ``` ### Checklist This pull request is: - [ ] A documentation / typographical error fix - Good to go, no issue or tests are needed - [ ] A short code fix - please include the issue number, and create an issue if none exists, which must include a complete example of the issue. one line code fixes without an issue and demonstration will not be accepted. - Please include: `Fixes: #` in the commit message - please include tests. one line code fixes without tests will not be accepted. - [x] A new feature implementation - please include the issue number, and create an issue if none exists, which must include a complete example of how the feature would look. - Please include: `Fixes: #` in the commit message - please include tests. **Have a nice day!** Fixes: #9608 Closes: #9607 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/9607 Pull-request-sha: b4bc8d3e57ab095a26112830ad4bea36083454e3 Change-Id: Icd753366cba166b8a60d1c8566377ec8335cd828 --- diff --git a/doc/build/changelog/unreleased_20/9608.rst b/doc/build/changelog/unreleased_20/9608.rst new file mode 100644 index 0000000000..a1b06cbe38 --- /dev/null +++ b/doc/build/changelog/unreleased_20/9608.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: usecase, postgresql + :tickets: 9608 + + Added ``prepared_statement_name_func`` connection argument option in the + asyncpg dialect. This option allow passing a callable used to customize + the name of the prepared statement that will be created by the driver + when executing the queries. + Pull request curtesy of Pavel Sirotkin. + + .. seealso:: + + :ref:`asyncpg_prepared_statement_name` diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 2acc5fea30..f0b4562d2b 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -98,6 +98,44 @@ To disable the prepared statement cache, use a value of zero:: stale, nor can it retry the statement as the PostgreSQL transaction is invalidated when these errors occur. +.. _asyncpg_prepared_statement_name: + +Prepared Statement Name +----------------------- + +By default, asyncpg enumerates prepared statements in numeric order, which +can lead to errors if a name has already been taken for another prepared +statement. This issue can arise if your application uses database proxies +such as PgBouncer to handle connections. One possible workaround is to +use dynamic prepared statement names, which asyncpg now supports through +an optional name value for the statement name. This allows you to +generate your own unique names that won't conflict with existing ones. +To achieve this, you can provide a function that will be called every time +a prepared statement is prepared:: + + from uuid import uuid4 + + engine = create_async_engine( + "postgresql+asyncpg://user:pass@hostname/dbname", + poolclass=NullPool, + connect_args={ + 'prepared_statement_name_func': lambda: f'__asyncpg_{uuid4()}__', + }, + ) + +.. seealso:: + + https://github.com/MagicStack/asyncpg/issues/837 + + https://github.com/sqlalchemy/sqlalchemy/issues/6467 + +.. warning:: To prevent a buildup of useless prepared statements in + your application, it's important to use the NullPool poolclass and + PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_ + setup. The DISCARD command is used to release resources held by the db connection, + including prepared statements. Without proper setup, prepared statements can + accumulate quickly and cause performance issues. + Disabling the PostgreSQL JIT to improve ENUM datatype handling --------------------------------------------------------------- @@ -642,13 +680,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): "_transaction", "_started", "_prepared_statement_cache", + "_prepared_statement_name_func", "_invalidate_schema_cache_asof", "_execute_mutex", ) await_ = staticmethod(await_only) - def __init__(self, dbapi, connection, prepared_statement_cache_size=100): + def __init__( + self, + dbapi, + connection, + prepared_statement_cache_size=100, + prepared_statement_name_func=None, + ): self.dbapi = dbapi self._connection = connection self.isolation_level = self._isolation_setting = "read_committed" @@ -666,6 +711,11 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): else: self._prepared_statement_cache = None + if prepared_statement_name_func: + self._prepared_statement_name_func = prepared_statement_name_func + else: + self._prepared_statement_name_func = self._default_name_func + async def _check_type_cache_invalidation(self, invalidate_timestamp): if invalidate_timestamp > self._invalidate_schema_cache_asof: await self._connection.reload_schema_state() @@ -676,7 +726,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): cache = self._prepared_statement_cache if cache is None: - prepared_stmt = await self._connection.prepare(operation) + prepared_stmt = await self._connection.prepare( + operation, name=self._prepared_statement_name_func() + ) attributes = prepared_stmt.get_attributes() return prepared_stmt, attributes @@ -692,7 +744,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): if cached_timestamp > invalidate_timestamp: return prepared_stmt, attributes - prepared_stmt = await self._connection.prepare(operation) + prepared_stmt = await self._connection.prepare( + operation, name=self._prepared_statement_name_func() + ) attributes = prepared_stmt.get_attributes() cache[operation] = (prepared_stmt, attributes, time.time()) @@ -792,6 +846,10 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): def terminate(self): self._connection.terminate() + @staticmethod + def _default_name_func(): + return None + class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection): __slots__ = () @@ -809,17 +867,23 @@ class AsyncAdapt_asyncpg_dbapi: prepared_statement_cache_size = kw.pop( "prepared_statement_cache_size", 100 ) + prepared_statement_name_func = kw.pop( + "prepared_statement_name_func", None + ) + if util.asbool(async_fallback): return AsyncAdaptFallback_asyncpg_connection( self, await_fallback(self.asyncpg.connect(*arg, **kw)), prepared_statement_cache_size=prepared_statement_cache_size, + prepared_statement_name_func=prepared_statement_name_func, ) else: return AsyncAdapt_asyncpg_connection( self, await_only(self.asyncpg.connect(*arg, **kw)), prepared_statement_cache_size=prepared_statement_cache_size, + prepared_statement_name_func=prepared_statement_name_func, ) class Error(Exception): diff --git a/test/dialect/postgresql/test_async_pg_py3k.py b/test/dialect/postgresql/test_async_pg_py3k.py index d9116a7cec..49014fcaf9 100644 --- a/test/dialect/postgresql/test_async_pg_py3k.py +++ b/test/dialect/postgresql/test_async_pg_py3k.py @@ -1,4 +1,5 @@ import random +import uuid from sqlalchemy import Column from sqlalchemy import exc @@ -272,3 +273,19 @@ class AsyncPgTest(fixtures.TestBase): await conn.close() eq_(codec_meth.mock_calls, [mock.call(adapted_conn)]) + + @async_test + async def test_name_connection_func(self, metadata, async_testing_engine): + cache = [] + + def name_f(): + name = str(uuid.uuid4()) + cache.append(name) + return name + + engine = async_testing_engine( + options={"connect_args": {"prepared_statement_name_func": name_f}}, + ) + async with engine.begin() as conn: + await conn.execute(select(1)) + assert len(cache) > 0