]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Use deque for the asnycio cursors
authorFederico Caselli <cfederico87@gmail.com>
Sat, 4 May 2024 20:06:12 +0000 (22:06 +0200)
committerFederico Caselli <cfederico87@gmail.com>
Tue, 28 May 2024 19:15:55 +0000 (21:15 +0200)
Uniform the implementation of the asyncio adapted cursors to
consistently use a deque as the internal row buffer.

Change-Id: If074f06c672569dbdd326cdf7a75800c45336d66

lib/sqlalchemy/dialects/mysql/aiomysql.py
lib/sqlalchemy/dialects/mysql/asyncmy.py
lib/sqlalchemy/dialects/postgresql/asyncpg.py
lib/sqlalchemy/dialects/postgresql/psycopg.py
lib/sqlalchemy/dialects/sqlite/aiosqlite.py

index 405fa82c8a57613747f4e6a4937e4a20058d3d32..45e226b94e02b3e000f7e22929a1b3dbbbcb23aa 100644 (file)
@@ -27,6 +27,8 @@ This dialect should normally be used only with the
 
 
 """  # noqa
+from collections import deque
+
 from .pymysql import MySQLDialect_pymysql
 from ... import pool
 from ... import util
@@ -57,7 +59,7 @@ class AsyncAdapt_aiomysql_cursor:
 
         # see https://github.com/aio-libs/aiomysql/issues/543
         self._cursor = self.await_(cursor.__aenter__())
-        self._rows = []
+        self._rows = deque()
 
     @property
     def description(self):
@@ -87,7 +89,7 @@ class AsyncAdapt_aiomysql_cursor:
         # exhausting rows, which we already have done for sync cursor.
         # another option would be to emulate aiosqlite dialect and assign
         # cursor only if we are doing server side cursor operation.
-        self._rows[:] = []
+        self._rows.clear()
 
     def execute(self, operation, parameters=None):
         return self.await_(self._execute_async(operation, parameters))
@@ -106,7 +108,7 @@ class AsyncAdapt_aiomysql_cursor:
                 # of that here since our default result is not async.
                 # we could just as easily grab "_rows" here and be done with it
                 # but this is safer.
-                self._rows = list(await self._cursor.fetchall())
+                self._rows = deque(await self._cursor.fetchall())
             return result
 
     async def _executemany_async(self, operation, seq_of_parameters):
@@ -118,11 +120,11 @@ class AsyncAdapt_aiomysql_cursor:
 
     def __iter__(self):
         while self._rows:
-            yield self._rows.pop(0)
+            yield self._rows.popleft()
 
     def fetchone(self):
         if self._rows:
-            return self._rows.pop(0)
+            return self._rows.popleft()
         else:
             return None
 
@@ -130,13 +132,12 @@ class AsyncAdapt_aiomysql_cursor:
         if size is None:
             size = self.arraysize
 
-        retval = self._rows[0:size]
-        self._rows[:] = self._rows[size:]
-        return retval
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
 
     def fetchall(self):
-        retval = self._rows[:]
-        self._rows[:] = []
+        retval = list(self._rows)
+        self._rows.clear()
         return retval
 
 
index 7360044d20b1886860bcf160c8f29c8e443b7573..474eb626d36c451d908810f654bceaac85db58d2 100644 (file)
@@ -25,6 +25,7 @@ This dialect should normally be used only with the
 
 
 """  # noqa
+from collections import deque
 from contextlib import asynccontextmanager
 
 from .pymysql import MySQLDialect_pymysql
@@ -56,7 +57,7 @@ class AsyncAdapt_asyncmy_cursor:
         cursor = self._connection.cursor()
 
         self._cursor = self.await_(cursor.__aenter__())
-        self._rows = []
+        self._rows = deque()
 
     @property
     def description(self):
@@ -86,7 +87,7 @@ class AsyncAdapt_asyncmy_cursor:
         # exhausting rows, which we already have done for sync cursor.
         # another option would be to emulate aiosqlite dialect and assign
         # cursor only if we are doing server side cursor operation.
-        self._rows[:] = []
+        self._rows.clear()
 
     def execute(self, operation, parameters=None):
         return self.await_(self._execute_async(operation, parameters))
@@ -108,7 +109,7 @@ class AsyncAdapt_asyncmy_cursor:
                 # of that here since our default result is not async.
                 # we could just as easily grab "_rows" here and be done with it
                 # but this is safer.
-                self._rows = list(await self._cursor.fetchall())
+                self._rows = deque(await self._cursor.fetchall())
             return result
 
     async def _executemany_async(self, operation, seq_of_parameters):
@@ -120,11 +121,11 @@ class AsyncAdapt_asyncmy_cursor:
 
     def __iter__(self):
         while self._rows:
-            yield self._rows.pop(0)
+            yield self._rows.popleft()
 
     def fetchone(self):
         if self._rows:
-            return self._rows.pop(0)
+            return self._rows.popleft()
         else:
             return None
 
@@ -132,13 +133,12 @@ class AsyncAdapt_asyncmy_cursor:
         if size is None:
             size = self.arraysize
 
-        retval = self._rows[0:size]
-        self._rows[:] = self._rows[size:]
-        return retval
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
 
     def fetchall(self):
-        retval = self._rows[:]
-        self._rows[:] = []
+        retval = list(self._rows)
+        self._rows.clear()
         return retval
 
 
index 12e711f52e23685c5cbfd5e88aee9615836aa39a..b00ce5a02da639715c7d3709ec62db616e523fc0 100644 (file)
@@ -487,7 +487,7 @@ class AsyncAdapt_asyncpg_cursor:
     def __init__(self, adapt_connection):
         self._adapt_connection = adapt_connection
         self._connection = adapt_connection._connection
-        self._rows = []
+        self._rows = deque()
         self._cursor = None
         self.description = None
         self.arraysize = 1
@@ -495,7 +495,7 @@ class AsyncAdapt_asyncpg_cursor:
         self._invalidate_schema_cache_asof = 0
 
     def close(self):
-        self._rows[:] = []
+        self._rows.clear()
 
     def _handle_exception(self, error):
         self._adapt_connection._handle_exception(error)
@@ -535,7 +535,7 @@ class AsyncAdapt_asyncpg_cursor:
                     self._cursor = await prepared_stmt.cursor(*parameters)
                     self.rowcount = -1
                 else:
-                    self._rows = await prepared_stmt.fetch(*parameters)
+                    self._rows = deque(await prepared_stmt.fetch(*parameters))
                     status = prepared_stmt.get_statusmsg()
 
                     reg = re.match(
@@ -583,11 +583,11 @@ class AsyncAdapt_asyncpg_cursor:
 
     def __iter__(self):
         while self._rows:
-            yield self._rows.pop(0)
+            yield self._rows.popleft()
 
     def fetchone(self):
         if self._rows:
-            return self._rows.pop(0)
+            return self._rows.popleft()
         else:
             return None
 
@@ -595,13 +595,12 @@ class AsyncAdapt_asyncpg_cursor:
         if size is None:
             size = self.arraysize
 
-        retval = self._rows[0:size]
-        self._rows[:] = self._rows[size:]
-        return retval
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
 
     def fetchall(self):
-        retval = self._rows[:]
-        self._rows[:] = []
+        retval = list(self._rows)
+        self._rows.clear()
         return retval
 
 
index a1ad0fc6821803e935decfc969bc3efd5d37d0b2..b8c0087dd491dcd761c6c8970c715bcb1c52e51e 100644 (file)
@@ -85,6 +85,7 @@ specified::
 """  # noqa
 from __future__ import annotations
 
+from collections import deque
 import logging
 import re
 from typing import cast
@@ -564,7 +565,7 @@ class AsyncAdapt_psycopg_cursor:
     def __init__(self, cursor, await_) -> None:
         self._cursor = cursor
         self.await_ = await_
-        self._rows = []
+        self._rows = deque()
 
     def __getattr__(self, name):
         return getattr(self._cursor, name)
@@ -591,24 +592,19 @@ class AsyncAdapt_psycopg_cursor:
         # eq/ne
         if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
             rows = self.await_(self._cursor.fetchall())
-            if not isinstance(rows, list):
-                self._rows = list(rows)
-            else:
-                self._rows = rows
+            self._rows = deque(rows)
         return result
 
     def executemany(self, query, params_seq):
         return self.await_(self._cursor.executemany(query, params_seq))
 
     def __iter__(self):
-        # TODO: try to avoid pop(0) on a list
         while self._rows:
-            yield self._rows.pop(0)
+            yield self._rows.popleft()
 
     def fetchone(self):
         if self._rows:
-            # TODO: try to avoid pop(0) on a list
-            return self._rows.pop(0)
+            return self._rows.popleft()
         else:
             return None
 
@@ -616,13 +612,12 @@ class AsyncAdapt_psycopg_cursor:
         if size is None:
             size = self._cursor.arraysize
 
-        retval = self._rows[0:size]
-        self._rows = self._rows[size:]
-        return retval
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
 
     def fetchall(self):
-        retval = self._rows
-        self._rows = []
+        retval = list(self._rows)
+        self._rows.clear()
         return retval
 
 
index 6c915634d119dfd3d0680c515d645126c4e32af9..796a80cf0607bb65c4388ece37e7a186bc2b7de9 100644 (file)
@@ -78,6 +78,7 @@ The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the
 """  # noqa
 
 import asyncio
+from collections import deque
 from functools import partial
 
 from .base import SQLiteExecutionContext
@@ -113,10 +114,10 @@ class AsyncAdapt_aiosqlite_cursor:
         self.arraysize = 1
         self.rowcount = -1
         self.description = None
-        self._rows = []
+        self._rows = deque()
 
     def close(self):
-        self._rows[:] = []
+        self._rows.clear()
 
     def execute(self, operation, parameters=None):
         try:
@@ -132,7 +133,7 @@ class AsyncAdapt_aiosqlite_cursor:
                 self.lastrowid = self.rowcount = -1
 
                 if not self.server_side:
-                    self._rows = self.await_(_cursor.fetchall())
+                    self._rows = deque(self.await_(_cursor.fetchall()))
             else:
                 self.description = None
                 self.lastrowid = _cursor.lastrowid
@@ -161,11 +162,11 @@ class AsyncAdapt_aiosqlite_cursor:
 
     def __iter__(self):
         while self._rows:
-            yield self._rows.pop(0)
+            yield self._rows.popleft()
 
     def fetchone(self):
         if self._rows:
-            return self._rows.pop(0)
+            return self._rows.popleft()
         else:
             return None
 
@@ -173,13 +174,12 @@ class AsyncAdapt_aiosqlite_cursor:
         if size is None:
             size = self.arraysize
 
-        retval = self._rows[0:size]
-        self._rows[:] = self._rows[size:]
-        return retval
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
 
     def fetchall(self):
-        retval = self._rows[:]
-        self._rows[:] = []
+        retval = list(self._rows)
+        self._rows.clear()
         return retval