From c2094b70e130e8390a82347290bb0817d48d14f6 Mon Sep 17 00:00:00 2001 From: Federico Caselli Date: Sat, 4 May 2024 22:06:12 +0200 Subject: [PATCH] Use deque for the asnycio cursors Uniform the implementation of the asyncio adapted cursors to consistently use a deque as the internal row buffer. Change-Id: If074f06c672569dbdd326cdf7a75800c45336d66 --- lib/sqlalchemy/dialects/mysql/aiomysql.py | 21 +++++++++-------- lib/sqlalchemy/dialects/mysql/asyncmy.py | 20 ++++++++-------- lib/sqlalchemy/dialects/postgresql/asyncpg.py | 19 ++++++++------- lib/sqlalchemy/dialects/postgresql/psycopg.py | 23 ++++++++----------- lib/sqlalchemy/dialects/sqlite/aiosqlite.py | 20 ++++++++-------- 5 files changed, 49 insertions(+), 54 deletions(-) diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py index 405fa82c8a..45e226b94e 100644 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py @@ -27,6 +27,8 @@ This dialect should normally be used only with the """ # noqa +from collections import deque + from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util @@ -57,7 +59,7 @@ class AsyncAdapt_aiomysql_cursor: # see https://github.com/aio-libs/aiomysql/issues/543 self._cursor = self.await_(cursor.__aenter__()) - self._rows = [] + self._rows = deque() @property def description(self): @@ -87,7 +89,7 @@ class AsyncAdapt_aiomysql_cursor: # exhausting rows, which we already have done for sync cursor. # another option would be to emulate aiosqlite dialect and assign # cursor only if we are doing server side cursor operation. - self._rows[:] = [] + self._rows.clear() def execute(self, operation, parameters=None): return self.await_(self._execute_async(operation, parameters)) @@ -106,7 +108,7 @@ class AsyncAdapt_aiomysql_cursor: # of that here since our default result is not async. # we could just as easily grab "_rows" here and be done with it # but this is safer. - self._rows = list(await self._cursor.fetchall()) + self._rows = deque(await self._cursor.fetchall()) return result async def _executemany_async(self, operation, seq_of_parameters): @@ -118,11 +120,11 @@ class AsyncAdapt_aiomysql_cursor: def __iter__(self): while self._rows: - yield self._rows.pop(0) + yield self._rows.popleft() def fetchone(self): if self._rows: - return self._rows.pop(0) + return self._rows.popleft() else: return None @@ -130,13 +132,12 @@ class AsyncAdapt_aiomysql_cursor: if size is None: size = self.arraysize - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] + retval = list(self._rows) + self._rows.clear() return retval diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py index 7360044d20..474eb626d3 100644 --- a/lib/sqlalchemy/dialects/mysql/asyncmy.py +++ b/lib/sqlalchemy/dialects/mysql/asyncmy.py @@ -25,6 +25,7 @@ This dialect should normally be used only with the """ # noqa +from collections import deque from contextlib import asynccontextmanager from .pymysql import MySQLDialect_pymysql @@ -56,7 +57,7 @@ class AsyncAdapt_asyncmy_cursor: cursor = self._connection.cursor() self._cursor = self.await_(cursor.__aenter__()) - self._rows = [] + self._rows = deque() @property def description(self): @@ -86,7 +87,7 @@ class AsyncAdapt_asyncmy_cursor: # exhausting rows, which we already have done for sync cursor. # another option would be to emulate aiosqlite dialect and assign # cursor only if we are doing server side cursor operation. - self._rows[:] = [] + self._rows.clear() def execute(self, operation, parameters=None): return self.await_(self._execute_async(operation, parameters)) @@ -108,7 +109,7 @@ class AsyncAdapt_asyncmy_cursor: # of that here since our default result is not async. # we could just as easily grab "_rows" here and be done with it # but this is safer. - self._rows = list(await self._cursor.fetchall()) + self._rows = deque(await self._cursor.fetchall()) return result async def _executemany_async(self, operation, seq_of_parameters): @@ -120,11 +121,11 @@ class AsyncAdapt_asyncmy_cursor: def __iter__(self): while self._rows: - yield self._rows.pop(0) + yield self._rows.popleft() def fetchone(self): if self._rows: - return self._rows.pop(0) + return self._rows.popleft() else: return None @@ -132,13 +133,12 @@ class AsyncAdapt_asyncmy_cursor: if size is None: size = self.arraysize - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] + retval = list(self._rows) + self._rows.clear() return retval diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 12e711f52e..b00ce5a02d 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -487,7 +487,7 @@ class AsyncAdapt_asyncpg_cursor: def __init__(self, adapt_connection): self._adapt_connection = adapt_connection self._connection = adapt_connection._connection - self._rows = [] + self._rows = deque() self._cursor = None self.description = None self.arraysize = 1 @@ -495,7 +495,7 @@ class AsyncAdapt_asyncpg_cursor: self._invalidate_schema_cache_asof = 0 def close(self): - self._rows[:] = [] + self._rows.clear() def _handle_exception(self, error): self._adapt_connection._handle_exception(error) @@ -535,7 +535,7 @@ class AsyncAdapt_asyncpg_cursor: self._cursor = await prepared_stmt.cursor(*parameters) self.rowcount = -1 else: - self._rows = await prepared_stmt.fetch(*parameters) + self._rows = deque(await prepared_stmt.fetch(*parameters)) status = prepared_stmt.get_statusmsg() reg = re.match( @@ -583,11 +583,11 @@ class AsyncAdapt_asyncpg_cursor: def __iter__(self): while self._rows: - yield self._rows.pop(0) + yield self._rows.popleft() def fetchone(self): if self._rows: - return self._rows.pop(0) + return self._rows.popleft() else: return None @@ -595,13 +595,12 @@ class AsyncAdapt_asyncpg_cursor: if size is None: size = self.arraysize - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] + retval = list(self._rows) + self._rows.clear() return retval diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg.py b/lib/sqlalchemy/dialects/postgresql/psycopg.py index a1ad0fc682..b8c0087dd4 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg.py @@ -85,6 +85,7 @@ specified:: """ # noqa from __future__ import annotations +from collections import deque import logging import re from typing import cast @@ -564,7 +565,7 @@ class AsyncAdapt_psycopg_cursor: def __init__(self, cursor, await_) -> None: self._cursor = cursor self.await_ = await_ - self._rows = [] + self._rows = deque() def __getattr__(self, name): return getattr(self._cursor, name) @@ -591,24 +592,19 @@ class AsyncAdapt_psycopg_cursor: # eq/ne if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: rows = self.await_(self._cursor.fetchall()) - if not isinstance(rows, list): - self._rows = list(rows) - else: - self._rows = rows + self._rows = deque(rows) return result def executemany(self, query, params_seq): return self.await_(self._cursor.executemany(query, params_seq)) def __iter__(self): - # TODO: try to avoid pop(0) on a list while self._rows: - yield self._rows.pop(0) + yield self._rows.popleft() def fetchone(self): if self._rows: - # TODO: try to avoid pop(0) on a list - return self._rows.pop(0) + return self._rows.popleft() else: return None @@ -616,13 +612,12 @@ class AsyncAdapt_psycopg_cursor: if size is None: size = self._cursor.arraysize - retval = self._rows[0:size] - self._rows = self._rows[size:] - return retval + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] def fetchall(self): - retval = self._rows - self._rows = [] + retval = list(self._rows) + self._rows.clear() return retval diff --git a/lib/sqlalchemy/dialects/sqlite/aiosqlite.py b/lib/sqlalchemy/dialects/sqlite/aiosqlite.py index 6c915634d1..796a80cf06 100644 --- a/lib/sqlalchemy/dialects/sqlite/aiosqlite.py +++ b/lib/sqlalchemy/dialects/sqlite/aiosqlite.py @@ -78,6 +78,7 @@ The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the """ # noqa import asyncio +from collections import deque from functools import partial from .base import SQLiteExecutionContext @@ -113,10 +114,10 @@ class AsyncAdapt_aiosqlite_cursor: self.arraysize = 1 self.rowcount = -1 self.description = None - self._rows = [] + self._rows = deque() def close(self): - self._rows[:] = [] + self._rows.clear() def execute(self, operation, parameters=None): try: @@ -132,7 +133,7 @@ class AsyncAdapt_aiosqlite_cursor: self.lastrowid = self.rowcount = -1 if not self.server_side: - self._rows = self.await_(_cursor.fetchall()) + self._rows = deque(self.await_(_cursor.fetchall())) else: self.description = None self.lastrowid = _cursor.lastrowid @@ -161,11 +162,11 @@ class AsyncAdapt_aiosqlite_cursor: def __iter__(self): while self._rows: - yield self._rows.pop(0) + yield self._rows.popleft() def fetchone(self): if self._rows: - return self._rows.pop(0) + return self._rows.popleft() else: return None @@ -173,13 +174,12 @@ class AsyncAdapt_aiosqlite_cursor: if size is None: size = self.arraysize - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] + retval = list(self._rows) + self._rows.clear() return retval -- 2.47.3