]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add name_func optional attribute for asyncpg adapter
authorPavel Sirotkin <pav.pnz@gmail.com>
Thu, 20 Apr 2023 17:40:04 +0000 (13:40 -0400)
committerFederico Caselli <cfederico87@gmail.com>
Fri, 21 Apr 2023 17:47:26 +0000 (19:47 +0200)
I faced an issue related to pg bouncer and prepared statement cache flow in asyncpg dialect. Regarding this discussion https://github.com/sqlalchemy/sqlalchemy/issues/6467 I prepared PR to support an optional parameter `name` in prepared statement which is allowed, since 0.25.0 version in `asyncpg` https://github.com/MagicStack/asyncpg/pull/846

**UPD:**
the issue with proposal: https://github.com/sqlalchemy/sqlalchemy/issues/9608

### Description
Added optional parameter `name_func` to `AsyncAdapt_asyncpg_connection` class which will call on the `self._connection.prepare()` function and populate a unique name.

so in general instead this

```python

from uuid import uuid4

from asyncpg import Connection

class CConnection(Connection):
    def _get_unique_id(self, prefix: str) -> str:
        return f'__asyncpg_{prefix}_{uuid4()}__'

engine = create_async_engine(...,
    connect_args={
        'connection_class': CConnection,
    },
)

```

would be enough

```python
from uuid import uuid4

engine = create_async_engine(...,
    connect_args={
        'name_func': lambda:  f'__asyncpg_{uuid4()}__',
    },
)

```

### Checklist
<!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once)

-->

This pull request is:

- [ ] A documentation / typographical error fix
- Good to go, no issue or tests are needed
- [ ] A short code fix
- please include the issue number, and create an issue if none exists, which
  must include a complete example of the issue.  one line code fixes without an
  issue and demonstration will not be accepted.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.   one line code fixes without tests will not be accepted.
- [x] A new feature implementation
- please include the issue number, and create an issue if none exists, which must
  include a complete example of how the feature would look.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.

**Have a nice day!**

Fixes: #9608
Closes: #9607
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/9607
Pull-request-sha: b4bc8d3e57ab095a26112830ad4bea36083454e3

Change-Id: Icd753366cba166b8a60d1c8566377ec8335cd828

doc/build/changelog/unreleased_20/9608.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/asyncpg.py
test/dialect/postgresql/test_async_pg_py3k.py

diff --git a/doc/build/changelog/unreleased_20/9608.rst b/doc/build/changelog/unreleased_20/9608.rst
new file mode 100644 (file)
index 0000000..a1b06cb
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: usecase, postgresql
+    :tickets: 9608
+
+    Added ``prepared_statement_name_func`` connection argument option in the
+    asyncpg dialect. This option allow passing a callable used to customize
+    the name of the prepared statement that will be created by the driver
+    when executing the queries.
+    Pull request curtesy of Pavel Sirotkin.
+
+    .. seealso::
+
+        :ref:`asyncpg_prepared_statement_name`
index 2acc5fea300e0259c92d70e9080c8e70355a9fc2..f0b4562d2b439df9ac2aeb9425ce1c3e66f688d4 100644 (file)
@@ -98,6 +98,44 @@ To disable the prepared statement cache, use a value of zero::
    stale, nor can it retry the statement as the PostgreSQL transaction is
    invalidated when these errors occur.
 
+.. _asyncpg_prepared_statement_name:
+
+Prepared Statement Name
+-----------------------
+
+By default, asyncpg enumerates prepared statements in numeric order, which
+can lead to errors if a name has already been taken for another prepared
+statement. This issue can arise if your application uses database proxies
+such as PgBouncer to handle connections. One possible workaround is to
+use dynamic prepared statement names, which asyncpg now supports through
+an optional name value for the statement name. This allows you to
+generate your own unique names that won't conflict with existing ones.
+To achieve this, you can provide a function that will be called every time
+a prepared statement is prepared::
+
+    from uuid import uuid4
+
+    engine = create_async_engine(
+        "postgresql+asyncpg://user:pass@hostname/dbname",
+        poolclass=NullPool,
+        connect_args={
+            'prepared_statement_name_func': lambda:  f'__asyncpg_{uuid4()}__',
+        },
+    )
+
+.. seealso::
+
+   https://github.com/MagicStack/asyncpg/issues/837
+
+   https://github.com/sqlalchemy/sqlalchemy/issues/6467
+
+.. warning:: To prevent a buildup of useless prepared statements in
+   your application, it's important to use the NullPool poolclass and
+   PgBouncer with a configured `DISCARD https://www.postgresql.org/docs/current/sql-discard.html`_
+   setup. The DISCARD command is used to release resources held by the db connection,
+   including prepared statements. Without proper setup, prepared statements can
+   accumulate quickly and cause performance issues.
+
 Disabling the PostgreSQL JIT to improve ENUM datatype handling
 ---------------------------------------------------------------
 
@@ -642,13 +680,20 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
         "_transaction",
         "_started",
         "_prepared_statement_cache",
+        "_prepared_statement_name_func",
         "_invalidate_schema_cache_asof",
         "_execute_mutex",
     )
 
     await_ = staticmethod(await_only)
 
-    def __init__(self, dbapi, connection, prepared_statement_cache_size=100):
+    def __init__(
+        self,
+        dbapi,
+        connection,
+        prepared_statement_cache_size=100,
+        prepared_statement_name_func=None,
+    ):
         self.dbapi = dbapi
         self._connection = connection
         self.isolation_level = self._isolation_setting = "read_committed"
@@ -666,6 +711,11 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
         else:
             self._prepared_statement_cache = None
 
+        if prepared_statement_name_func:
+            self._prepared_statement_name_func = prepared_statement_name_func
+        else:
+            self._prepared_statement_name_func = self._default_name_func
+
     async def _check_type_cache_invalidation(self, invalidate_timestamp):
         if invalidate_timestamp > self._invalidate_schema_cache_asof:
             await self._connection.reload_schema_state()
@@ -676,7 +726,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
 
         cache = self._prepared_statement_cache
         if cache is None:
-            prepared_stmt = await self._connection.prepare(operation)
+            prepared_stmt = await self._connection.prepare(
+                operation, name=self._prepared_statement_name_func()
+            )
             attributes = prepared_stmt.get_attributes()
             return prepared_stmt, attributes
 
@@ -692,7 +744,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
             if cached_timestamp > invalidate_timestamp:
                 return prepared_stmt, attributes
 
-        prepared_stmt = await self._connection.prepare(operation)
+        prepared_stmt = await self._connection.prepare(
+            operation, name=self._prepared_statement_name_func()
+        )
         attributes = prepared_stmt.get_attributes()
         cache[operation] = (prepared_stmt, attributes, time.time())
 
@@ -792,6 +846,10 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
     def terminate(self):
         self._connection.terminate()
 
+    @staticmethod
+    def _default_name_func():
+        return None
+
 
 class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
     __slots__ = ()
@@ -809,17 +867,23 @@ class AsyncAdapt_asyncpg_dbapi:
         prepared_statement_cache_size = kw.pop(
             "prepared_statement_cache_size", 100
         )
+        prepared_statement_name_func = kw.pop(
+            "prepared_statement_name_func", None
+        )
+
         if util.asbool(async_fallback):
             return AsyncAdaptFallback_asyncpg_connection(
                 self,
                 await_fallback(self.asyncpg.connect(*arg, **kw)),
                 prepared_statement_cache_size=prepared_statement_cache_size,
+                prepared_statement_name_func=prepared_statement_name_func,
             )
         else:
             return AsyncAdapt_asyncpg_connection(
                 self,
                 await_only(self.asyncpg.connect(*arg, **kw)),
                 prepared_statement_cache_size=prepared_statement_cache_size,
+                prepared_statement_name_func=prepared_statement_name_func,
             )
 
     class Error(Exception):
index d9116a7cecc815d8c9809caff97ad0fb705c085c..49014fcaf94144c1414803693ecefc24bdcf55b2 100644 (file)
@@ -1,4 +1,5 @@
 import random
+import uuid
 
 from sqlalchemy import Column
 from sqlalchemy import exc
@@ -272,3 +273,19 @@ class AsyncPgTest(fixtures.TestBase):
             await conn.close()
 
         eq_(codec_meth.mock_calls, [mock.call(adapted_conn)])
+
+    @async_test
+    async def test_name_connection_func(self, metadata, async_testing_engine):
+        cache = []
+
+        def name_f():
+            name = str(uuid.uuid4())
+            cache.append(name)
+            return name
+
+        engine = async_testing_engine(
+            options={"connect_args": {"prepared_statement_name_func": name_f}},
+        )
+        async with engine.begin() as conn:
+            await conn.execute(select(1))
+            assert len(cache) > 0