from typing import MutableMapping
from typing import NoReturn
from typing import Optional
+from typing import overload
from typing import Sequence
from typing import Set
from typing import Tuple
from . import elements
from . import type_api
from .elements import BindParameter
+ from .elements import ClauseList
from .elements import ColumnClause # noqa
from .elements import ColumnElement
from .elements import KeyedColumnElement
__slots__ = "_collection", "_index", "_colset"
_collection: List[Tuple[_COLKEY, _COL_co]]
- _index: Dict[Union[None, str, int], _COL_co]
+ _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
_colset: Set[_COL_co]
def __init__(
if columns:
self._initial_populate(columns)
+ @util.preload_module("sqlalchemy.sql.elements")
+ def __clause_element__(self) -> ClauseList:
+ elements = util.preloaded.sql_elements
+
+ return elements.ClauseList(
+ _literal_as_text_role=roles.ColumnsClauseRole,
+ group=False,
+ *self._all_columns,
+ )
+
def _initial_populate(
self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
) -> None:
@property
def _all_columns(self) -> List[_COL_co]:
- return [col for (k, col) in self._collection]
+ return [col for (_, col) in self._collection]
def keys(self) -> List[_COLKEY]:
"""Return a sequence of string key names for all columns in this
collection."""
- return [k for (k, col) in self._collection]
+ return [k for (k, _) in self._collection]
def values(self) -> List[_COL_co]:
"""Return a sequence of :class:`_sql.ColumnClause` or
:class:`_schema.Column` objects for all columns in this
collection."""
- return [col for (k, col) in self._collection]
+ return [col for (_, col) in self._collection]
def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
"""Return a sequence of (key, column) tuples for all columns in this
def __iter__(self) -> Iterator[_COL_co]:
# turn to a list first to maintain over a course of changes
- return iter([col for k, col in self._collection])
+ return iter([col for _, col in self._collection])
+ @overload
def __getitem__(self, key: Union[str, int]) -> _COL_co:
+ ...
+
+ @overload
+ def __getitem__(
+ self, key: Tuple[Union[str, int], ...]
+ ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
+ ...
+
+ def __getitem__(
+ self, key: Union[str, int, Tuple[Union[str, int], ...]]
+ ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
try:
- return self._index[key]
+ if isinstance(key, tuple):
+ return ColumnCollection( # type: ignore
+ [self._index[sub_key] for sub_key in key]
+ ).as_readonly()
+ else:
+ return self._index[key][1]
except KeyError as err:
- if isinstance(key, int):
- raise IndexError(key) from err
+ if isinstance(err.args[0], int):
+ raise IndexError(err.args[0]) from err
else:
raise
def __getattr__(self, key: str) -> _COL_co:
try:
- return self._index[key]
+ return self._index[key][1]
except KeyError as err:
raise AttributeError(key) from err
:class:`_expression.ColumnCollection`."""
if key in self._index:
- return self._index[key]
+ return self._index[key][1]
else:
return default
self._collection[:] = cols
self._colset.update(c for k, c in self._collection)
self._index.update(
- (idx, c) for idx, (k, c) in enumerate(self._collection)
+ (idx, (k, c)) for idx, (k, c) in enumerate(self._collection)
+ )
+ self._index.update(
+ {k: (k, col) for k, col in reversed(self._collection)}
)
- self._index.update({k: col for k, col in reversed(self._collection)})
def add(
self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
self._collection.append((colkey, _column))
self._colset.add(_column)
- self._index[l] = _column
+ self._index[l] = (colkey, _column)
if colkey not in self._index:
- self._index[colkey] = _column
+ self._index[colkey] = (colkey, _column)
def __getstate__(self) -> Dict[str, Any]:
- return {"_collection": self._collection, "_index": self._index}
+ return {
+ "_collection": self._collection,
+ "_index": self._index,
+ }
def __setstate__(self, state: Dict[str, Any]) -> None:
object.__setattr__(self, "_index", state["_index"])
col, intersect = None, None
target_set = column.proxy_set
- cols = [c for (k, c) in self._collection]
+ cols = [c for (_, c) in self._collection]
for c in cols:
expanded_proxy_set = set(_expand_cloned(c.proxy_set))
i = target_set.intersection(expanded_proxy_set)
if key in self._index:
- existing = self._index[key]
+ existing = self._index[key][1]
if existing is named_column:
return
l = len(self._collection)
self._collection.append((key, named_column))
self._colset.add(named_column)
- self._index[l] = named_column
- self._index[key] = named_column
+ self._index[l] = (key, named_column)
+ self._index[key] = (key, named_column)
def _populate_separate_keys(
self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
elif col.key in self._index:
replace_col.append(col)
else:
- self._index[k] = col
+ self._index[k] = (k, col)
self._collection.append((k, col))
self._colset.update(c for (k, c) in self._collection)
self._index.update(
- (idx, c) for idx, (k, c) in enumerate(self._collection)
+ (idx, (k, c)) for idx, (k, c) in enumerate(self._collection)
)
for col in replace_col:
self.replace(col)
]
self._index.update(
- {idx: col for idx, (k, col) in enumerate(self._collection)}
+ {idx: (k, col) for idx, (k, col) in enumerate(self._collection)}
)
# delete higher index
del self._index[len(self._collection)]
remove_col = set()
# remove up to two columns based on matches of name as well as key
if column.name in self._index and column.key != column.name:
- other = self._index[column.name]
+ other = self._index[column.name][1]
if other.name == other.key:
remove_col.add(other)
if column.key in self._index:
- remove_col.add(self._index[column.key])
+ remove_col.add(self._index[column.key][1])
new_cols: List[Tuple[str, _NAMEDCOL]] = []
replaced = False
self._index.clear()
self._index.update(
- {idx: col for idx, (k, col) in enumerate(self._collection)}
+ {idx: (k, col) for idx, (k, col) in enumerate(self._collection)}
)
- self._index.update(self._collection)
+ self._index.update({k: (k, col) for (k, col) in self._collection})
class ReadOnlyColumnCollection(
eq_(coll._colset, set(c for k, c in coll._collection))
d = {}
for k, col in coll._collection:
- d.setdefault(k, col)
- d.update({idx: col for idx, (k, col) in enumerate(coll._collection)})
+ d.setdefault(k, (k, col))
+ d.update(
+ {idx: (k, col) for idx, (k, col) in enumerate(coll._collection)}
+ )
eq_(coll._index, d)
def test_keys(self):
ci = cc.as_readonly()
eq_(ci.items(), [("c1", c1), ("foo", c2), ("c3", c3)])
+ def test_getitem_tuple_str(self):
+ c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
+ c2.key = "foo"
+ cc = self._column_collection(
+ columns=[("c1", c1), ("foo", c2), ("c3", c3)]
+ )
+ sub_cc = cc["c3", "foo"]
+ is_(sub_cc.c3, c3)
+ eq_(list(sub_cc), [c3, c2])
+
+ def test_getitem_tuple_int(self):
+ c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
+ c2.key = "foo"
+ cc = self._column_collection(
+ columns=[("c1", c1), ("foo", c2), ("c3", c3)]
+ )
+
+ sub_cc = cc[2, 1]
+ is_(sub_cc.c3, c3)
+ eq_(list(sub_cc), [c3, c2])
+
def test_key_index_error(self):
cc = self._column_collection(
columns=[
from sqlalchemy.sql import table
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_
table1 = table(
"mytable",
" %(joiner)s SELECT :param_2 AS anon_2"
" %(joiner)s SELECT :param_3 AS anon_3" % {"joiner": joiner},
)
+
+
+class ColumnCollectionAsSelectTest(fixtures.TestBase, AssertsCompiledSQL):
+ """tests related to #8285."""
+
+ __dialect__ = "default"
+
+ def test_c_collection_as_from(self):
+ stmt = select(parent.c)
+
+ # this works because _all_selected_columns expands out
+ # ClauseList. it does so in the same way that it works for
+ # Table already. so this is free
+ eq_(stmt._all_selected_columns, [parent.c.id, parent.c.data])
+
+ self.assert_compile(stmt, "SELECT parent.id, parent.data FROM parent")
+
+ def test_c_sub_collection_str_stmt(self):
+ stmt = select(table1.c["myid", "description"])
+
+ self.assert_compile(
+ stmt, "SELECT mytable.myid, mytable.description FROM mytable"
+ )
+
+ subq = stmt.subquery()
+ self.assert_compile(
+ select(subq.c[0]).where(subq.c.description == "x"),
+ "SELECT anon_1.myid FROM (SELECT mytable.myid AS myid, "
+ "mytable.description AS description FROM mytable) AS anon_1 "
+ "WHERE anon_1.description = :description_1",
+ )
+
+ def test_c_sub_collection_int_stmt(self):
+ stmt = select(table1.c[2, 0])
+
+ self.assert_compile(
+ stmt, "SELECT mytable.description, mytable.myid FROM mytable"
+ )
+
+ subq = stmt.subquery()
+ self.assert_compile(
+ select(subq.c.myid).where(subq.c[1] == "x"),
+ "SELECT anon_1.myid FROM (SELECT mytable.description AS "
+ "description, mytable.myid AS myid FROM mytable) AS anon_1 "
+ "WHERE anon_1.myid = :myid_1",
+ )
+
+ def test_c_sub_collection_str(self):
+ coll = table1.c["myid", "description"]
+ is_(coll.myid, table1.c.myid)
+
+ eq_(list(coll), [table1.c.myid, table1.c.description])
+
+ def test_c_sub_collection_int(self):
+ coll = table1.c[2, 0]
+
+ is_(coll.myid, table1.c.myid)
+
+ eq_(list(coll), [table1.c.description, table1.c.myid])
+
+ def test_missing_key(self):
+
+ with expect_raises_message(KeyError, "unknown"):
+ table1.c["myid", "unknown"]
+
+ def test_missing_index(self):
+
+ with expect_raises_message(IndexError, "5"):
+ table1.c["myid", 5]