]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Improve typing and code in cursor module
authorFederico Caselli <cfederico87@gmail.com>
Thu, 9 Oct 2025 18:45:26 +0000 (20:45 +0200)
committerFederico Caselli <cfederico87@gmail.com>
Thu, 9 Oct 2025 18:49:36 +0000 (20:49 +0200)
Change-Id: I56b89f303f03dfc7056c7174ac06bc1679319bb5
(cherry picked from commit 16be119cc04d9327df78e0891bec40765bfd5d4f)

lib/sqlalchemy/engine/cursor.py

index 8e2348efab5c0da31ad9c282a0a7f21fd6267243..0dbc8e625b7aaf755655e99f1dbf7d9cf2cf2694 100644 (file)
@@ -52,6 +52,7 @@ from ..sql.compiler import RM_RENDERED_NAME
 from ..sql.compiler import RM_TYPE
 from ..sql.type_api import TypeEngine
 from ..util import compat
+from ..util.typing import Final
 from ..util.typing import Literal
 from ..util.typing import Self
 
@@ -79,45 +80,45 @@ _T = TypeVar("_T", bound=Any)
 # using raw tuple is faster than namedtuple.
 # these match up to the positions in
 # _CursorKeyMapRecType
-MD_INDEX: Literal[0] = 0
+MD_INDEX: Final[Literal[0]] = 0
 """integer index in cursor.description
 
 """
 
-MD_RESULT_MAP_INDEX: Literal[1] = 1
+MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
 """integer index in compiled._result_columns"""
 
-MD_OBJECTS: Literal[2] = 2
+MD_OBJECTS: Final[Literal[2]] = 2
 """other string keys and ColumnElement obj that can match.
 
 This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
 
 """
 
-MD_LOOKUP_KEY: Literal[3] = 3
+MD_LOOKUP_KEY: Final[Literal[3]] = 3
 """string key we usually expect for key-based lookup
 
 this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
 """
 
 
-MD_RENDERED_NAME: Literal[4] = 4
+MD_RENDERED_NAME: Final[Literal[4]] = 4
 """name that is usually in cursor.description
 
 this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
 """
 
 
-MD_PROCESSOR: Literal[5] = 5
+MD_PROCESSOR: Final[Literal[5]] = 5
 """callable to process a result value into a row"""
 
-MD_UNTRANSLATED: Literal[6] = 6
+MD_UNTRANSLATED: Final[Literal[6]] = 6
 """raw name from cursor.description"""
 
 
 _CursorKeyMapRecType = Tuple[
     Optional[int],  # MD_INDEX, None means the record is ambiguously named
-    int,  # MD_RESULT_MAP_INDEX
+    int,  # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
     List[Any],  # MD_OBJECTS
     str,  # MD_LOOKUP_KEY
     str,  # MD_RENDERED_NAME
@@ -222,21 +223,17 @@ class CursorResultMetaData(ResultMetaData):
 
         keymap = dict(self._keymap)
         offset = len(self._keys)
-        keymap.update(
-            {
-                key: (
-                    # int index should be None for ambiguous key
-                    (
-                        value[0] + offset
-                        if value[0] is not None and key not in keymap
-                        else None
-                    ),
-                    value[1] + offset,
-                    *value[2:],
-                )
-                for key, value in other._keymap.items()
-            }
-        )
+
+        for key, value in other._keymap.items():
+            # int index should be None for ambiguous key
+            if value[MD_INDEX] is not None and key not in keymap:
+                md_index = value[MD_INDEX] + offset
+                md_object = value[MD_RESULT_MAP_INDEX] + offset
+            else:
+                md_index = None
+                md_object = -1
+            keymap[key] = (md_index, md_object, *value[2:])
+
         return self._make_new_metadata(
             unpickled=self._unpickled,
             processors=self._processors + other._processors,  # type: ignore
@@ -401,7 +398,7 @@ class CursorResultMetaData(ResultMetaData):
         # column keys and other names
         if num_ctx_cols:
             # keymap by primary string...
-            by_key = {
+            by_key: Dict[str, _CursorKeyMapRecType] = {
                 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
                 for metadata_entry in raw
             }
@@ -447,7 +444,7 @@ class CursorResultMetaData(ResultMetaData):
                 # record into by_key.
                 by_key.update(
                     {
-                        key: (None, None, [], key, key, None, None)
+                        key: (None, -1, [], key, key, None, None)
                         for key in dupes
                     }
                 )