--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 4719
+
+ Calling the :meth:`.Query.instances` method without passing a
+ :class:`.QueryContext` is deprecated. The original use case for this was
+ that a :class:`.Query` could yield ORM objects when given only the entities
+ to be selected as well as a DBAPI cursor object. However, for this to work
+ correctly there is essential metadata that is passed from a SQLAlchemy
+ :class:`.ResultProxy` that is derived from the mapped column expressions,
+ which comes originally from the :class:`.QueryContext`. To retrieve ORM
+ results from arbitrary SELECT statements, the :meth:`.Query.from_statement`
+ method should be used.
+
--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 4877
+
+ Deprecated the behavior by which a :class:`.Column` can be used as the key
+ in a result set row lookup, when that :class:`.Column` is not part of the
+ SQL selectable that is being selected; that is, it is only matched on name.
+ A deprecation warning is now emitted for this case. Various ORM use
+ cases, such as those involving :func:`.text` constructs, have been improved
+ so that this fallback logic is avoided in most cases.
+
return super(MSSQLCompiler, self).visit_binary(binary, **kwargs)
def returning_clause(self, stmt, returning_cols):
+ # SQL server returning clause requires that the columns refer to
+ # the virtual table names "inserted" or "deleted". Here, we make
+ # a simple alias of our table with that name, and then adapt the
+ # columns we have from the list of RETURNING columns to that new name
+ # so that they render as "inserted.<colname>" / "deleted.<colname>".
if self.isinsert or self.isupdate:
target = stmt.table.alias("inserted")
adapter = sql_util.ClauseAdapter(target)
+ # adapter.traverse() takes a column from our target table and returns
+ # the one that is linked to the "inserted" / "deleted" tables. So in
+ # order to retrieve these values back from the result (e.g. like
+ # row[column]), tell the compiler to also add the original unadapted
+ # column to the result map. Before #4877, these were (unknowingly)
+ # falling back using string name matching in the result set which
+ # necessarily used an expensive KeyError in order to match.
+
columns = [
self._label_select_column(
- None, adapter.traverse(c), True, False, {}
+ None,
+ adapter.traverse(c),
+ True,
+ False,
+ {"result_map_targets": (c,)},
)
for c in expression._select_iterables(returning_cols)
]
compiled._result_columns,
compiled._ordered_columns,
compiled._textual_ordered_columns,
+ compiled._loose_column_name_matching,
)
self.unicode_statement = util.text_type(compiled)
result_columns,
cols_are_ordered,
textual_ordered,
+ loose_column_name_matching,
) = context.result_column_struct
num_ctx_cols = len(result_columns)
else:
result_columns = (
cols_are_ordered
- ) = num_ctx_cols = textual_ordered = False
+ ) = (
+ num_ctx_cols
+ ) = loose_column_name_matching = textual_ordered = False
# merge cursor.description with the column info
# present in the compiled structure, if any
num_ctx_cols,
cols_are_ordered,
textual_ordered,
+ loose_column_name_matching,
)
self._keymap = {}
num_ctx_cols,
cols_are_ordered,
textual_ordered,
+ loose_column_name_matching,
):
"""Merge a cursor.description with compiled result column information.
# compiled SQL with a mismatch of description cols
# vs. compiled cols, or textual w/ unordered columns
raw_iterator = self._merge_cols_by_name(
- context, cursor_description, result_columns
+ context,
+ cursor_description,
+ result_columns,
+ loose_column_name_matching,
)
else:
# no compiled SQL, just a raw string
yield idx, colname, mapped_type, coltype, obj, untranslated
- def _merge_cols_by_name(self, context, cursor_description, result_columns):
+ def _merge_cols_by_name(
+ self,
+ context,
+ cursor_description,
+ result_columns,
+ loose_column_name_matching,
+ ):
dialect = context.dialect
case_sensitive = dialect.case_sensitive
match_map = self._create_description_match_map(
- result_columns, case_sensitive
+ result_columns, case_sensitive, loose_column_name_matching
)
-
self.matched_on_name = True
for (
idx,
@classmethod
def _create_description_match_map(
- cls, result_columns, case_sensitive=True
+ cls,
+ result_columns,
+ case_sensitive=True,
+ loose_column_name_matching=False,
):
"""when matching cursor.description to a set of names that are present
in a Compiled object, as is the case with TextualSelect, get all the
d = {}
for elem in result_columns:
- key, rec = (
- elem[RM_RENDERED_NAME],
- (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE]),
- )
+ key = elem[RM_RENDERED_NAME]
if not case_sensitive:
key = key.lower()
if key in d:
- # conflicting keyname, just double up the list
- # of objects. this will cause an "ambiguous name"
- # error if an attempt is made by the result set to
- # access.
+ # conflicting keyname - just add the column-linked objects
+ # to the existing record. if there is a duplicate column
+ # name in the cursor description, this will allow all of those
+ # objects to raise an ambiguous column error
e_name, e_obj, e_type = d[key]
- d[key] = e_name, e_obj + rec[1], e_type
+ d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type
else:
- d[key] = rec
-
+ d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE])
+
+ if loose_column_name_matching:
+ # when using a textual statement with an unordered set
+ # of columns that line up, we are expecting the user
+ # to be using label names in the SQL that match to the column
+ # expressions. Enable more liberal matching for this case;
+ # duplicate keys that are ambiguous will be fixed later.
+ for r_key in elem[RM_OBJECTS]:
+ d.setdefault(
+ r_key, (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE])
+ )
return d
def _key_fallback(self, key, raiseerr=True):
break
else:
result = None
+ if result is not None:
+ if result[MD_OBJECTS] is _UNPICKLED:
+ util.warn_deprecated(
+ "Retreiving row values using Column objects from a "
+ "row that was unpickled is deprecated; adequate "
+ "state cannot be pickled for this to be efficient. "
+ "This usage will raise KeyError in a future release."
+ )
+ else:
+ util.warn_deprecated(
+ "Retreiving row values using Column objects with only "
+ "matching names as keys is deprecated, and will raise "
+ "KeyError in a future release; only Column "
+ "objects that are explicitly part of the statement "
+ "object should be used."
+ )
if result is None:
if raiseerr:
raise exc.NoSuchColumnError(
"""
statement = coercions.expect(roles.SelectStatementRole, statement)
-
- # TODO: coercions above should have this handled
- assert isinstance(
- statement, (expression.TextClause, expression.SelectBase)
- )
-
self._statement = statement
def first(self):
]
]
- def instances(self, cursor, __context=None):
- """Given a ResultProxy cursor as returned by connection.execute(),
- return an ORM result as an iterator.
+ def instances(self, result_proxy, context=None):
+ """Return an ORM result given a :class:`.ResultProxy` and
+ :class:`.QueryContext`.
- e.g.::
-
- result = engine.execute("select * from users")
- for u in session.query(User).instances(result):
- print u
"""
- context = __context
if context is None:
+ util.warn_deprecated(
+ "Using the Query.instances() method without a context "
+ "is deprecated and will be disallowed in a future release. "
+ "Please make use of :meth:`.Query.from_statement` "
+ "for linking ORM results to arbitrary select constructs."
+ )
context = QueryContext(self)
- return loading.instances(self, cursor, context)
+ return loading.instances(self, result_proxy, context)
def merge_result(self, iterator, load=True):
"""Merge a result into this :class:`.Query` object's Session.
context = QueryContext(self)
if context.statement is not None:
+ if isinstance(context.statement, expression.TextClause):
+ # setup for all entities, including contains_eager entities.
+ for entity in self._entities:
+ entity.setup_context(self, context)
+ context.statement = expression.TextualSelect(
+ context.statement,
+ context.primary_columns,
+ positional=False,
+ )
+ else:
+ # allow TextualSelect with implicit columns as well
+ # as select() with ad-hoc columns, see test_query::TextTest
+ self._from_obj_alias = sql.util.ColumnAdapter(
+ context.statement, adapt_on_names=True
+ )
+
return context
context.labels = not for_statement or self._with_labels
if ("fetch_column", self) in context.attributes:
column = context.attributes[("fetch_column", self)]
else:
- column = query._adapt_clause(self.column, False, True)
+ column = self.column
+ if query._from_obj_alias:
+ column = query._from_obj_alias.columns[column]
if column._annotations:
# annotated columns perform more slowly in compiler and
self.statement = query._statement.apply_labels()
else:
self.statement = query._statement
+ self.order_by = None
else:
self.statement = None
self.from_clause = query._from_obj
if not isinstance(alias, str):
info = inspect(alias)
alias = info.selectable
+ else:
+ util.warn_deprecated(
+ "Passing a string name for the 'alias' argument to "
+ "'contains_eager()` is deprecated, and will not work in a "
+ "future release. Please use a sqlalchemy.alias() or "
+ "sqlalchemy.orm.aliased() construct."
+ )
elif getattr(attr, "_of_type", None):
ot = inspect(attr._of_type)
True unless using an unordered TextualSelect.
"""
+ _loose_column_name_matching = False
+ """tell the result object that the SQL staement is textual, wants to match
+ up to Column objects, and may be using the ._label in the SELECT rather
+ than the base name.
+
+ """
+
_numeric_binds = False
"""
True if paramstyle is "numeric". This paramstyle is trickier than
within_label_clause=False,
within_columns_clause=False,
render_label_as_label=None,
+ result_map_targets=(),
**kw
):
# only render labels within the columns clause
add_to_result_map(
labelname,
label.name,
- (label, labelname) + label._alt_names,
+ (label, labelname) + label._alt_names + result_map_targets,
label.type,
)
)
def visit_column(
- self, column, add_to_result_map=None, include_table=True, **kwargs
+ self,
+ column,
+ add_to_result_map=None,
+ include_table=True,
+ result_map_targets=(),
+ **kwargs
):
name = orig_name = column.name
if name is None:
if add_to_result_map is not None:
add_to_result_map(
- name, orig_name, (column, name, column.key), column.type
+ name,
+ orig_name,
+ (column, name, column.key, column._label) + result_map_targets,
+ column.type,
)
if is_literal:
self._ordered_columns = (
self._textual_ordered_columns
) = taf.positional
+
+ # enable looser result column matching when the SQL text links to
+ # Column objects by name only
+ self._loose_column_name_matching = not taf.positional and bool(
+ taf.column_args
+ )
+
for c in taf.column_args:
self.process(
c,
label = quoted_name(label, t.name.quote)
# ensure the label name doesn't conflict with that
- # of an existing column
+ # of an existing column. note that this implies that any
+ # Column must **not** set up its _label before its parent table
+ # has all of its other Column objects set up. There are several
+ # tables in the test suite which will fail otherwise; example:
+ # table "owner" has columns "name" and "owner_name". Therefore
+ # column owner.name cannot use the label "owner_name", it has
+ # to be "owner_name_1".
if label in t.c:
_label = label
counter = 1
c._proxies = [self]
if selectable._is_clone_of is not None:
c._is_clone_of = selectable._is_clone_of.columns.get(c.key)
-
return c.key, c
def __init__(self, text, columns, positional=False):
self.element = text
- self.column_args = columns
+ # convert for ORM attributes->columns, etc
+ self.column_args = [
+ coercions.expect(roles.ColumnsClauseRole, c) for c in columns
+ ]
self.positional = positional
@SelectBase._memoized_property
if newcol is not None:
return newcol
if self.adapt_on_names and newcol is None:
- # TODO: this should be changed to .exported_columns if and
- # when we need to be able to adapt a plain Select statement
- newcol = self.selectable.c.get(col.name)
+ newcol = self.selectable.exported_columns.get(col.name)
return newcol
def replace(self, col):
from sqlalchemy.orm import comparable_property
from sqlalchemy.orm import composite
from sqlalchemy.orm import configure_mappers
+from sqlalchemy.orm import contains_alias
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import create_session
from sqlalchemy.orm import defer
from . import _fixtures
from .inheritance import _poly_fixtures
from .test_options import PathTest as OptionsPathTest
+from .test_query import QueryTest
from .test_transaction import _LocalFixture
addresses,
non_primary=True,
)
+
+
+class InstancesTest(QueryTest, AssertsCompiledSQL):
+ def test_from_alias_one(self):
+ User, addresses, users = (
+ self.classes.User,
+ self.tables.addresses,
+ self.tables.users,
+ )
+
+ query = (
+ users.select(users.c.id == 7)
+ .union(users.select(users.c.id > 7))
+ .alias("ulist")
+ .outerjoin(addresses)
+ .select(
+ use_labels=True, order_by=[text("ulist.id"), addresses.c.id]
+ )
+ )
+ sess = create_session()
+ q = sess.query(User)
+
+ # note this has multiple problems because we aren't giving Query
+ # the statement where it would be able to create an adapter
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context",
+ "Retreiving row values using Column objects with only "
+ "matching names",
+ ):
+ result = list(
+ q.options(
+ contains_alias("ulist"), contains_eager("addresses")
+ ).instances(query.execute())
+ )
+ assert self.static.user_address_result == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager(self):
+ users, addresses, User = (
+ self.tables.users,
+ self.tables.addresses,
+ self.classes.User,
+ )
+
+ sess = create_session()
+
+ selectquery = users.outerjoin(addresses).select(
+ users.c.id < 10,
+ use_labels=True,
+ order_by=[users.c.id, addresses.c.id],
+ )
+ q = sess.query(User)
+
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context"
+ ):
+ result = list(
+ q.options(contains_eager("addresses")).instances(
+ selectquery.execute()
+ )
+ )
+ assert self.static.user_address_result[0:3] == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ sess.expunge_all()
+
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context"
+ ):
+ result = list(
+ q.options(contains_eager(User.addresses)).instances(
+ selectquery.execute()
+ )
+ )
+ assert self.static.user_address_result[0:3] == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager_string_alias(self):
+ addresses, users, User = (
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User,
+ )
+
+ sess = create_session()
+ q = sess.query(User)
+
+ adalias = addresses.alias("adalias")
+ selectquery = users.outerjoin(adalias).select(
+ use_labels=True, order_by=[users.c.id, adalias.c.id]
+ )
+
+ # note this has multiple problems because we aren't giving Query
+ # the statement where it would be able to create an adapter
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context",
+ r"Passing a string name for the 'alias' argument to "
+ r"'contains_eager\(\)` is deprecated",
+ "Retreiving row values using Column objects with only "
+ "matching names",
+ ):
+ result = list(
+ q.options(
+ contains_eager("addresses", alias="adalias")
+ ).instances(selectquery.execute())
+ )
+ assert self.static.user_address_result == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager_aliased_instances(self):
+ addresses, users, User = (
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User,
+ )
+
+ sess = create_session()
+ q = sess.query(User)
+
+ adalias = addresses.alias("adalias")
+ selectquery = users.outerjoin(adalias).select(
+ use_labels=True, order_by=[users.c.id, adalias.c.id]
+ )
+
+ # note this has multiple problems because we aren't giving Query
+ # the statement where it would be able to create an adapter
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context"
+ ):
+ result = list(
+ q.options(
+ contains_eager("addresses", alias=adalias)
+ ).instances(selectquery.execute())
+ )
+ assert self.static.user_address_result == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager_multi_string_alias(self):
+ orders, items, users, order_items, User = (
+ self.tables.orders,
+ self.tables.items,
+ self.tables.users,
+ self.tables.order_items,
+ self.classes.User,
+ )
+
+ sess = create_session()
+ q = sess.query(User)
+
+ oalias = orders.alias("o1")
+ ialias = items.alias("i1")
+ query = (
+ users.outerjoin(oalias)
+ .outerjoin(order_items)
+ .outerjoin(ialias)
+ .select(use_labels=True)
+ .order_by(users.c.id, oalias.c.id, ialias.c.id)
+ )
+
+ # test using string alias with more than one level deep
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context",
+ r"Passing a string name for the 'alias' argument to "
+ r"'contains_eager\(\)` is deprecated",
+ "Retreiving row values using Column objects with only "
+ "matching names",
+ ):
+ result = list(
+ q.options(
+ contains_eager("orders", alias="o1"),
+ contains_eager("orders.items", alias="i1"),
+ ).instances(query.execute())
+ )
+ assert self.static.user_order_result == result
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_contains_eager_multi_alias(self):
+ orders, items, users, order_items, User = (
+ self.tables.orders,
+ self.tables.items,
+ self.tables.users,
+ self.tables.order_items,
+ self.classes.User,
+ )
+
+ sess = create_session()
+ q = sess.query(User)
+
+ oalias = orders.alias("o1")
+ ialias = items.alias("i1")
+ query = (
+ users.outerjoin(oalias)
+ .outerjoin(order_items)
+ .outerjoin(ialias)
+ .select(use_labels=True)
+ .order_by(users.c.id, oalias.c.id, ialias.c.id)
+ )
+
+ # test using Alias with more than one level deep
+
+ # new way:
+ # from sqlalchemy.orm.strategy_options import Load
+ # opt = Load(User).contains_eager('orders', alias=oalias).
+ # contains_eager('items', alias=ialias)
+
+ def go():
+ with testing.expect_deprecated(
+ r"Using the Query.instances\(\) method without a context"
+ ):
+ result = list(
+ q.options(
+ contains_eager("orders", alias=oalias),
+ contains_eager("orders.items", alias=ialias),
+ ).instances(query.execute())
+ )
+ assert self.static.user_order_result == result
+
+ self.assert_sql_count(testing.db, go, 1)
class InstancesTest(QueryTest, AssertsCompiledSQL):
- def test_from_alias_one(self):
- User, addresses, users = (
- self.classes.User,
- self.tables.addresses,
- self.tables.users,
- )
-
- query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
- .alias("ulist")
- .outerjoin(addresses)
- .select(
- use_labels=True, order_by=[text("ulist.id"), addresses.c.id]
- )
- )
- sess = create_session()
- q = sess.query(User)
-
- def go():
- result = list(
- q.options(
- contains_alias("ulist"), contains_eager("addresses")
- ).instances(query.execute())
- )
- assert self.static.user_address_result == result
-
- self.assert_sql_count(testing.db, go, 1)
-
def test_from_alias_two(self):
User, addresses, users = (
self.classes.User,
self.assert_sql_count(testing.db, go, 1)
- def test_contains_eager(self):
- users, addresses, User = (
- self.tables.users,
- self.tables.addresses,
- self.classes.User,
- )
+ def test_contains_eager_one(self):
+ addresses, User = (self.tables.addresses, self.classes.User)
sess = create_session()
assert self.static.user_address_result == q.all()
self.assert_sql_count(testing.db, go, 1)
- sess.expunge_all()
+
+ def test_contains_eager_two(self):
+ users, addresses, User = (
+ self.tables.users,
+ self.tables.addresses,
+ self.classes.User,
+ )
+
+ sess = create_session()
adalias = addresses.alias()
q = (
eq_(self.static.user_address_result, q.all())
self.assert_sql_count(testing.db, go, 1)
- sess.expunge_all()
+
+ def test_contains_eager_four(self):
+ users, addresses, User = (
+ self.tables.users,
+ self.tables.addresses,
+ self.classes.User,
+ )
+
+ sess = create_session()
selectquery = users.outerjoin(addresses).select(
users.c.id < 10,
use_labels=True,
order_by=[users.c.id, addresses.c.id],
)
- q = sess.query(User)
-
- def go():
- result = list(
- q.options(contains_eager("addresses")).instances(
- selectquery.execute()
- )
- )
- assert self.static.user_address_result[0:3] == result
- self.assert_sql_count(testing.db, go, 1)
-
- sess.expunge_all()
-
- def go():
- result = list(
- q.options(contains_eager(User.addresses)).instances(
- selectquery.execute()
- )
- )
- assert self.static.user_address_result[0:3] == result
-
- self.assert_sql_count(testing.db, go, 1)
- sess.expunge_all()
+ q = sess.query(User)
def go():
result = (
self.assert_sql_count(testing.db, go, 1)
- def test_contains_eager_string_alias(self):
- addresses, users, User = (
- self.tables.addresses,
- self.tables.users,
- self.classes.User,
- )
-
- sess = create_session()
- q = sess.query(User)
-
- adalias = addresses.alias("adalias")
- selectquery = users.outerjoin(adalias).select(
- use_labels=True, order_by=[users.c.id, adalias.c.id]
- )
-
- # string alias name
- def go():
- result = list(
- q.options(
- contains_eager("addresses", alias="adalias")
- ).instances(selectquery.execute())
- )
- assert self.static.user_address_result == result
-
- self.assert_sql_count(testing.db, go, 1)
-
- def test_contains_eager_aliased_instances(self):
- addresses, users, User = (
- self.tables.addresses,
- self.tables.users,
- self.classes.User,
- )
-
- sess = create_session()
- q = sess.query(User)
-
- adalias = addresses.alias("adalias")
- selectquery = users.outerjoin(adalias).select(
- use_labels=True, order_by=[users.c.id, adalias.c.id]
- )
-
- # expression.Alias object
- def go():
- result = list(
- q.options(
- contains_eager("addresses", alias=adalias)
- ).instances(selectquery.execute())
- )
- assert self.static.user_address_result == result
-
- self.assert_sql_count(testing.db, go, 1)
-
def test_contains_eager_aliased(self):
User, Address = self.classes.User, self.classes.Address
self.assert_sql_count(testing.db, go, 1)
- def test_contains_eager_multi_string_alias(self):
- orders, items, users, order_items, User = (
- self.tables.orders,
- self.tables.items,
- self.tables.users,
- self.tables.order_items,
- self.classes.User,
- )
-
- sess = create_session()
- q = sess.query(User)
-
- oalias = orders.alias("o1")
- ialias = items.alias("i1")
- query = (
- users.outerjoin(oalias)
- .outerjoin(order_items)
- .outerjoin(ialias)
- .select(use_labels=True)
- .order_by(users.c.id, oalias.c.id, ialias.c.id)
- )
-
- # test using string alias with more than one level deep
- def go():
- result = list(
- q.options(
- contains_eager("orders", alias="o1"),
- contains_eager("orders.items", alias="i1"),
- ).instances(query.execute())
- )
- assert self.static.user_order_result == result
-
- self.assert_sql_count(testing.db, go, 1)
-
def test_contains_eager_multi_alias(self):
orders, items, users, order_items, User = (
self.tables.orders,
q.options(
contains_eager("orders", alias=oalias),
contains_eager("orders.items", alias=ialias),
- ).instances(query.execute())
+ ).from_statement(query)
)
assert self.static.user_order_result == result
use_labels=True, order_by=[users.c.id, addresses.c.id]
)
eq_(
- list(sess.query(User, Address).instances(selectquery.execute())),
+ list(sess.query(User, Address).from_statement(selectquery)),
expected,
)
sess.expunge_all()
sess.expunge_all()
# TODO: figure out why group_by(users) doesn't work here
+ count = func.count(addresses.c.id).label("count")
s = (
- select([users, func.count(addresses.c.id).label("count")])
+ select([users, count])
.select_from(users.outerjoin(addresses))
.group_by(*[c for c in users.c])
.order_by(User.id)
# session's identity map)
r = users.select().order_by(users.c.id).execute()
+ ctx = sess.query(User)._compile_context()
+
def go():
- result = list(sess.query(User).instances(r))
+ result = list(sess.query(User).instances(r, ctx))
eq_(result, self.static.user_address_result)
self.sql_count_(4, go)
# then assert the data, which will launch 6 more lazy loads
r = users.select().execute()
+ ctx = sess.query(User)._compile_context()
+
def go():
- result = list(sess.query(User).instances(r))
+ result = list(sess.query(User).instances(r, ctx))
eq_(result, self.static.user_all_result)
self.assert_sql_count(testing.db, go, 6)
eq_(row.User, row[0])
eq_(row.orders, row[1])
- # test here that first col is not labeled, only
- # one name in keys, matches correctly
for row in sess.query(User.name + "hoho", User.name):
eq_(list(row.keys()), ["name"])
eq_(row[0], row.name + "hoho")
from sqlalchemy.orm import backref
from sqlalchemy.orm import Bundle
from sqlalchemy.orm import column_property
+from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import create_session
from sqlalchemy.orm import defer
from sqlalchemy.orm import joinedload
from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+from sqlalchemy.util import collections_abc
from test.orm import _fixtures
def test_single_entity_true(self):
User = self.classes.User
row = create_session().query(User).only_return_tuples(True).first()
- assert isinstance(row, tuple)
+ assert isinstance(row, collections_abc.Sequence)
def test_multiple_entity_false(self):
User = self.classes.User
.only_return_tuples(False)
.first()
)
- assert isinstance(row, tuple)
+ assert isinstance(row, collections_abc.Sequence)
def test_multiple_entity_true(self):
User = self.classes.User
.only_return_tuples(True)
.first()
)
- assert isinstance(row, tuple)
+ assert isinstance(row, collections_abc.Sequence)
class RowTupleTest(QueryTest):
)
result = list(
- session.query(User).instances(s.execute(emailad="jack@bean.com"))
+ session.query(User)
+ .params(emailad="jack@bean.com")
+ .from_statement(s)
)
eq_([User(id=7)], result)
class TextTest(QueryTest, AssertsCompiledSQL):
__dialect__ = "default"
- def test_fulltext(self):
+ def test_needs_text(self):
User = self.classes.User
assert_raises_message(
"select * from users order by id",
)
+ def test_select_star(self):
+ User = self.classes.User
+
eq_(
create_session()
.query(User)
None,
)
+ def test_columns_mismatched(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter
+ User = self.classes.User
+
+ s = create_session()
+ q = s.query(User).from_statement(
+ text(
+ "select name, 27 as foo, id as users_id from users order by id"
+ )
+ )
+ eq_(
+ q.all(),
+ [
+ User(id=7, name="jack"),
+ User(id=8, name="ed"),
+ User(id=9, name="fred"),
+ User(id=10, name="chuck"),
+ ],
+ )
+
+ def test_columns_multi_table_uselabels(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = create_session()
+ q = s.query(User, Address).from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+
+ eq_(
+ q.all(),
+ [
+ (User(id=8), Address(id=2)),
+ (User(id=8), Address(id=3)),
+ (User(id=8), Address(id=4)),
+ ],
+ )
+
+ def test_columns_multi_table_uselabels_contains_eager(self):
+ # test that columns using column._label match, as well as that
+ # ordering doesn't matter.
+ User = self.classes.User
+ Address = self.classes.Address
+
+ s = create_session()
+ q = (
+ s.query(User)
+ .from_statement(
+ text(
+ "select users.name AS users_name, users.id AS users_id, "
+ "addresses.id AS addresses_id FROM users JOIN addresses "
+ "ON users.id = addresses.user_id WHERE users.id=8 "
+ "ORDER BY addresses.id"
+ )
+ )
+ .options(contains_eager(User.addresses))
+ )
+
+ def go():
+ r = q.all()
+ eq_(r[0].addresses, [Address(id=2), Address(id=3), Address(id=4)])
+
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_other_eager_loads(self):
+ # this is new in 1.4. with textclause, we build up column loaders
+ # normally, so that eager loaders also get installed. previously,
+ # _compile_context() didn't build up column loaders and attempted
+ # to get them after the fact.
+ User = self.classes.User
+
+ s = create_session()
+ q = (
+ s.query(User)
+ .from_statement(text("select * from users ORDER BY users.id"))
+ .options(subqueryload(User.addresses))
+ )
+
+ def go():
+ eq_(q.all(), self.static.user_address_result)
+
+ self.assert_sql_count(testing.db, go, 2)
+
def test_whereclause(self):
User = self.classes.User
"id in (:id1, :id2)",
)
- def test_as_column(self):
+ def test_plain_textual_column(self):
User = self.classes.User
s = create_session()
# TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_cextensions 6412,322,3969,13151,1340,2187,2770
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 6429,322,3969,13149,1341,2187,2766
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_cextensions 6177,306,3889,12597,1233,2133,2650
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_nocextensions 6260,306,3969,13203,1344,2151,2840
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_cextensions 6412,322,4242,13151,1340,2187,2770
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_dbapiunicode_nocextensions 6429,322,4242,13149,1341,2187,2766
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_cextensions 6177,306,4162,12597,1233,2133,2650
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.7_postgresql_psycopg2_dbapiunicode_nocextensions 6260,306,4242,13203,1344,2151,2840
# TEST: test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation
eq_(
comp._create_result_map(),
{
- "a": ("a", (t.c.a, "a", "a"), t.c.a.type),
- "b": ("b", (t.c.b, "b", "b"), t.c.b.type),
+ "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type),
+ "b": ("b", (t.c.b, "b", "b", "t_b"), t.c.b.type),
},
)
comp = stmt.compile()
eq_(
comp._create_result_map(),
- {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)},
+ {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)},
)
def test_compound_only_top_populates(self):
comp = stmt.compile()
eq_(
comp._create_result_map(),
- {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)},
+ {"a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type)},
)
def test_label_plus_element(self):
eq_(
comp._create_result_map(),
{
- "a": ("a", (t.c.a, "a", "a"), t.c.a.type),
+ "a": ("a", (t.c.a, "a", "a", "t_a"), t.c.a.type),
"bar": ("bar", (l1, "bar"), l1.type),
"anon_1": (
tc.anon_label,
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
comp._create_result_map(),
- {"a": ("a", (aint, "a", "a"), aint.type)},
+ {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)},
)
def test_insert_from_select(self):
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
comp._create_result_map(),
- {"a": ("a", (aint, "a", "a"), aint.type)},
+ {"a": ("a", (aint, "a", "a", "t2_a"), aint.type)},
)
def test_nested_api(self):
{
"otherid": (
"otherid",
- (table2.c.otherid, "otherid", "otherid"),
+ (
+ table2.c.otherid,
+ "otherid",
+ "otherid",
+ "myothertable_otherid",
+ ),
table2.c.otherid.type,
),
"othername": (
"othername",
- (table2.c.othername, "othername", "othername"),
+ (
+ table2.c.othername,
+ "othername",
+ "othername",
+ "myothertable_othername",
+ ),
table2.c.othername.type,
),
"k1": ("k1", (1, 2, 3), int_),
{
"myid": (
"myid",
- (table1.c.myid, "myid", "myid"),
+ (table1.c.myid, "myid", "myid", "mytable_myid"),
table1.c.myid.type,
),
"k2": ("k2", (3, 4, 5), int_),
"name": (
"name",
- (table1.c.name, "name", "name"),
+ (table1.c.name, "name", "name", "mytable_name"),
table1.c.name.type,
),
"description": (
"description",
- (table1.c.description, "description", "description"),
+ (
+ table1.c.description,
+ "description",
+ "description",
+ "mytable_description",
+ ),
table1.c.description.type,
),
},
from sqlalchemy import alias
from sqlalchemy import bindparam
-from sqlalchemy import Column
+from sqlalchemy import CHAR
from sqlalchemy import column
from sqlalchemy import create_engine
from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import join
from sqlalchemy import literal_column
from sqlalchemy import select
from sqlalchemy import sql
from sqlalchemy import String
-from sqlalchemy import Table
from sqlalchemy import table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import util
+from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.schema import DDL
from sqlalchemy.sql import coercions
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import in_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing import not_in_
+from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import Table
class DeprecationWarningsTest(fixtures.TestBase):
{
"myid": (
"myid",
- (table1.c.myid, "myid", "myid"),
+ (table1.c.myid, "myid", "myid", "mytable_myid"),
table1.c.myid.type,
)
},
{
"myid": (
"myid",
- (table1.c.myid, "myid", "myid"),
+ (table1.c.myid, "myid", "myid", "mytable_myid"),
table1.c.myid.type,
)
},
with self._expect_deprecated("Select", "from", "select_from"):
stmt.append_from(t1.join(t2, t1.c.q == t2.c.q))
self.assert_compile(stmt, "SELECT t1.q FROM t1 JOIN t2 ON t1.q = t2.q")
+
+
+class KeyTargetingTest(fixtures.TablesTest):
+ run_inserts = "once"
+ run_deletes = None
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "keyed1",
+ metadata,
+ Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q"),
+ )
+ Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
+ Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
+ Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
+ Table("content", metadata, Column("t", String(30), key="type"))
+ Table("bar", metadata, Column("ctype", String(30), key="content_type"))
+
+ if testing.requires.schemas.enabled:
+ Table(
+ "wschema",
+ metadata,
+ Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q"),
+ schema=testing.config.test_schema,
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
+ cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
+ cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
+ cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
+ cls.tables.content.insert().execute(type="t1")
+
+ if testing.requires.schemas.enabled:
+ cls.tables[
+ "%s.wschema" % testing.config.test_schema
+ ].insert().execute(dict(b="a1", q="c1"))
+
+ def test_column_label_overlap_fallback(self):
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])
+ ).first()
+
+ not_in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(sql.column("content_type"), row)
+
+ row = testing.db.execute(
+ select([func.now().label("content_type")])
+ ).first()
+ not_in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(sql.column("content_type"), row)
+
+ def test_columnclause_schema_column_one(self):
+ keyed2 = self.tables.keyed2
+
+ # this is addressed by [ticket:2932]
+ # ColumnClause._compare_name_for_result allows the
+ # columns which the statement is against to be lightweight
+ # cols, which results in a more liberal comparison scheme
+ a, b = sql.column("a"), sql.column("b")
+ stmt = select([a, b]).select_from(table("keyed2"))
+ row = testing.db.execute(stmt).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.b, row)
+
+ def test_columnclause_schema_column_two(self):
+ keyed2 = self.tables.keyed2
+
+ a, b = sql.column("a"), sql.column("b")
+ stmt = select([keyed2.c.a, keyed2.c.b])
+ row = testing.db.execute(stmt).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(b, row)
+
+ def test_columnclause_schema_column_three(self):
+ keyed2 = self.tables.keyed2
+
+ # originally addressed by [ticket:2932], however liberalized
+ # Column-targeting rules are deprecated
+
+ a, b = sql.column("a"), sql.column("b")
+ stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
+ row = testing.db.execute(stmt).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.b, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(b, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.b, row)
+
+ def test_columnclause_schema_column_four(self):
+ keyed2 = self.tables.keyed2
+
+ # this is also addressed by [ticket:2932]
+
+ a, b = sql.column("keyed2_a"), sql.column("keyed2_b")
+ stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
+ a, b
+ )
+ row = testing.db.execute(stmt).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.b, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.keyed2_a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.keyed2_b, row)
+
+ def test_columnclause_schema_column_five(self):
+ keyed2 = self.tables.keyed2
+
+ # this is also addressed by [ticket:2932]
+
+ stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
+ keyed2_a=CHAR, keyed2_b=CHAR
+ )
+ row = testing.db.execute(stmt).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(keyed2.c.b, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.keyed2_a, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names",
+ "The SelectBase.c and SelectBase.columns",
+ ):
+ in_(stmt.c.keyed2_b, row)
+
+
+class ResultProxyTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+ Table(
+ "addresses",
+ metadata,
+ Column(
+ "address_id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
+ ),
+ Column("user_id", Integer, ForeignKey("users.user_id")),
+ Column("address", String(30)),
+ test_needs_acid=True,
+ )
+
+ Table(
+ "users2",
+ metadata,
+ Column("user_id", INT, primary_key=True),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ @classmethod
+ def insert_data(cls):
+ users = cls.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(
+ users.insert(),
+ dict(user_id=1, user_name="john"),
+ dict(user_id=2, user_name="jack"),
+ )
+
+ def test_column_accessor_textual_select(self):
+ users = self.tables.users
+
+ # this will create column() objects inside
+ # the select(), these need to match on name anyway
+ r = testing.db.execute(
+ select([column("user_id"), column("user_name")])
+ .select_from(table("users"))
+ .where(text("user_id=2"))
+ ).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(r[users.c.user_id], 2)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(r[users.c.user_name], "jack")
+
+ def test_column_accessor_basic_text(self):
+ users = self.tables.users
+
+ r = testing.db.execute(
+ text("select * from users where user_id=2")
+ ).first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(r[users.c.user_id], 2)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(r[users.c.user_name], "jack")
+
+ @testing.provide_metadata
+ def test_column_label_overlap_fallback(self):
+ content = Table("content", self.metadata, Column("type", String(30)))
+ bar = Table("bar", self.metadata, Column("content_type", String(30)))
+ self.metadata.create_all(testing.db)
+ testing.db.execute(content.insert().values(type="t1"))
+
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ in_(content.c.type, row)
+ not_in_(bar.c.content_type, row)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(sql.column("content_type"), row)
+
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])
+ ).first()
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(content.c.type, row)
+
+ not_in_(bar.c.content_type, row)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(sql.column("content_type"), row)
+
+ row = testing.db.execute(
+ select([func.now().label("content_type")])
+ ).first()
+
+ not_in_(content.c.type, row)
+
+ not_in_(bar.c.content_type, row)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ in_(sql.column("content_type"), row)
+
+ def test_pickled_rows(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+ with testing.db.connect() as conn:
+ conn.execute(users.delete())
+ conn.execute(
+ users.insert(),
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": "fred"},
+ )
+
+ for pickle in False, True:
+ for use_labels in False, True:
+ result = (
+ users.select(use_labels=use_labels)
+ .order_by(users.c.user_id)
+ .execute()
+ .fetchall()
+ )
+
+ if pickle:
+ result = util.pickle.loads(util.pickle.dumps(result))
+
+ if pickle:
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0][users.c.user_id], 7)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0][users.c.user_name], "jack")
+
+ if not pickle or use_labels:
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.user_id],
+ )
+ else:
+ # test with a different table. name resolution is
+ # causing 'user_id' to match when use_labels wasn't used.
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0][addresses.c.user_id], 7)
+
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.address_id],
+ )
+
+
+class PositionalTextTest(fixtures.TablesTest):
+ run_inserts = "once"
+ run_deletes = None
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "text1",
+ metadata,
+ Column("a", CHAR(2)),
+ Column("b", CHAR(2)),
+ Column("c", CHAR(2)),
+ Column("d", CHAR(2)),
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.tables.text1.insert().execute(
+ [dict(a="a1", b="b1", c="c1", d="d1")]
+ )
+
+ def test_anon_aliased_overlapping(self):
+ text1 = self.tables.text1
+
+ c1 = text1.c.a.label(None)
+ c2 = text1.alias().c.a
+ c3 = text1.alias().c.a.label(None)
+ c4 = text1.c.a.label(None)
+
+ stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
+ result = testing.db.execute(stmt)
+ row = result.first()
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(row[text1.c.a], "a1")
+
+ def test_anon_aliased_unique(self):
+ text1 = self.tables.text1
+
+ c1 = text1.c.a.label(None)
+ c2 = text1.alias().c.c
+ c3 = text1.alias().c.b
+ c4 = text1.alias().c.d.label(None)
+
+ stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
+ result = testing.db.execute(stmt)
+ row = result.first()
+
+ eq_(row[c1], "a1")
+ eq_(row[c2], "b1")
+ eq_(row[c3], "c1")
+ eq_(row[c4], "d1")
+
+ # key fallback rules still match this to a column
+ # unambiguously based on its name
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(row[text1.c.a], "a1")
+
+ # key fallback rules still match this to a column
+ # unambiguously based on its name
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "with only matching names"
+ ):
+ eq_(row[text1.c.d], "d1")
+
+ # text1.c.b goes nowhere....because we hit key fallback
+ # but the text1.c.b doesn't derive from text1.c.c
+ assert_raises_message(
+ exc.NoSuchColumnError,
+ "Could not locate column in row for column 'text1.b'",
+ lambda: row[text1.c.b],
+ )
from sqlalchemy.engine import default
from sqlalchemy.engine import result as _result
from sqlalchemy.engine import Row
+from sqlalchemy.sql.selectable import TextualSelect
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assertions
row = testing.db.execute(content.select(use_labels=True)).first()
in_(content.c.type, row)
not_in_(bar.c.content_type, row)
- in_(sql.column("content_type"), row)
-
- row = testing.db.execute(
- select([content.c.type.label("content_type")])
- ).first()
- in_(content.c.type, row)
-
- not_in_(bar.c.content_type, row)
-
- in_(sql.column("content_type"), row)
row = testing.db.execute(
select([func.now().label("content_type")])
).first()
- not_in_(content.c.type, row)
+ not_in_(content.c.type, row)
not_in_(bar.c.content_type, row)
- in_(sql.column("content_type"), row)
-
def test_pickled_rows(self):
users = self.tables.users
- addresses = self.tables.addresses
users.insert().execute(
{"user_id": 7, "user_name": "jack"},
eq_(list(result[0].keys()), ["user_id", "user_name"])
eq_(result[0][0], 7)
- eq_(result[0][users.c.user_id], 7)
- eq_(result[0][users.c.user_name], "jack")
-
- if not pickle or use_labels:
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.user_id],
- )
- else:
- # test with a different table. name resolution is
- # causing 'user_id' to match when use_labels wasn't used.
- eq_(result[0][addresses.c.user_id], 7)
assert_raises(
exc.NoSuchColumnError, lambda: result[0]["fake key"]
)
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.address_id],
- )
def test_column_error_printing(self):
result = testing.db.execute(select([1]))
eq_(r.user_id, 2)
eq_(r["user_id"], 2)
- eq_(r[users.c.user_id], 2)
eq_(r.user_name, "jack")
eq_(r["user_name"], "jack")
- eq_(r[users.c.user_name], "jack")
def test_column_accessor_textual_select(self):
users = self.tables.users
eq_(r.user_id, 2)
eq_(r["user_id"], 2)
- eq_(r[users.c.user_id], 2)
eq_(r.user_name, "jack")
eq_(r["user_name"], "jack")
- eq_(r[users.c.user_name], "jack")
def test_column_accessor_dotted_union(self):
users = self.tables.users
set([True]),
)
+ def test_loose_matching_one(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ conn.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = conn.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.address_id AS address_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [
+ users.c.user_id,
+ users.c.user_name,
+ addresses.c.address_id,
+ ],
+ positional=False,
+ )
+ )
+ row = result.first()
+ eq_(row[users.c.user_id], 1)
+ eq_(row[users.c.user_name], "john")
+
+ def test_loose_matching_two(self):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ with testing.db.connect() as conn:
+ # MARKMARK
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ conn.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = conn.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.user_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [users.c.user_id, users.c.user_name, addresses.c.user_id],
+ positional=False,
+ )
+ )
+ row = result.first()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row[users.c.user_id],
+ )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row[addresses.c.user_id],
+ )
+ eq_(row[users.c.user_name], "john")
+
def test_ambiguous_column_by_col_plus_label(self):
users = self.tables.users
assert_raises(KeyError, lambda: row["keyed2_c"])
assert_raises(KeyError, lambda: row["keyed2_q"])
- def test_column_label_overlap_fallback(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(
- select([content.c.type.label("content_type")])
- ).first()
-
- not_in_(content.c.type, row)
- not_in_(bar.c.content_type, row)
-
- in_(sql.column("content_type"), row)
-
- row = testing.db.execute(
- select([func.now().label("content_type")])
- ).first()
- not_in_(content.c.type, row)
- not_in_(bar.c.content_type, row)
- in_(sql.column("content_type"), row)
-
- def test_column_label_overlap_fallback_2(self):
- content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(content.select(use_labels=True)).first()
- in_(content.c.type, row)
- not_in_(bar.c.content_type, row)
- not_in_(sql.column("content_type"), row)
-
def test_columnclause_schema_column_one(self):
- keyed2 = self.tables.keyed2
-
- # this is addressed by [ticket:2932]
- # ColumnClause._compare_name_for_result allows the
- # columns which the statement is against to be lightweight
- # cols, which results in a more liberal comparison scheme
+ # originally addressed by [ticket:2932], however liberalized
+ # Column-targeting rules are deprecated
a, b = sql.column("a"), sql.column("b")
stmt = select([a, b]).select_from(table("keyed2"))
row = testing.db.execute(stmt).first()
- in_(keyed2.c.a, row)
- in_(keyed2.c.b, row)
in_(a, row)
in_(b, row)
def test_columnclause_schema_column_two(self):
keyed2 = self.tables.keyed2
- a, b = sql.column("a"), sql.column("b")
stmt = select([keyed2.c.a, keyed2.c.b])
row = testing.db.execute(stmt).first()
in_(keyed2.c.a, row)
in_(keyed2.c.b, row)
- in_(a, row)
- in_(b, row)
def test_columnclause_schema_column_three(self):
- keyed2 = self.tables.keyed2
-
# this is also addressed by [ticket:2932]
- a, b = sql.column("a"), sql.column("b")
stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
row = testing.db.execute(stmt).first()
- in_(keyed2.c.a, row)
- in_(keyed2.c.b, row)
- in_(a, row)
- in_(b, row)
in_(stmt.selected_columns.a, row)
in_(stmt.selected_columns.b, row)
def test_columnclause_schema_column_four(self):
- keyed2 = self.tables.keyed2
-
- # this is also addressed by [ticket:2932]
+ # originally addressed by [ticket:2932], however liberalized
+ # Column-targeting rules are deprecated
a, b = sql.column("keyed2_a"), sql.column("keyed2_b")
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
)
row = testing.db.execute(stmt).first()
- in_(keyed2.c.a, row)
- in_(keyed2.c.b, row)
in_(a, row)
in_(b, row)
in_(stmt.selected_columns.keyed2_a, row)
in_(stmt.selected_columns.keyed2_b, row)
def test_columnclause_schema_column_five(self):
- keyed2 = self.tables.keyed2
-
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
)
row = testing.db.execute(stmt).first()
- in_(keyed2.c.a, row)
- in_(keyed2.c.b, row)
in_(stmt.selected_columns.keyed2_a, row)
in_(stmt.selected_columns.keyed2_b, row)
eq_(row[c3], "c1")
eq_(row[c4], "d1")
- # key fallback rules still match this to a column
- # unambiguously based on its name
- eq_(row[text1.c.a], "a1")
-
- # key fallback rules still match this to a column
- # unambiguously based on its name
- eq_(row[text1.c.d], "d1")
-
# text1.c.b goes nowhere....because we hit key fallback
# but the text1.c.b doesn't derive from text1.c.c
assert_raises_message(
eq_(row[c3], "c1")
eq_(row[c4], "d1")
- # key fallback rules still match this to a column
- # unambiguously based on its name
- eq_(row[text1.c.a], "a1")
-
def test_anon_aliased_name_conflict(self):
text1 = self.tables.text1
{
"id": (
"id",
- (t.selected_columns.id, "id", "id"),
+ (t.selected_columns.id, "id", "id", "id"),
t.selected_columns.id.type,
),
"name": (
"name",
- (t.selected_columns.name, "name", "name"),
+ (t.selected_columns.name, "name", "name", "name"),
t.selected_columns.name.type,
),
},
{
"id": (
"id",
- (t.selected_columns.id, "id", "id"),
+ (t.selected_columns.id, "id", "id", "id"),
t.selected_columns.id.type,
),
"name": (
"name",
- (t.selected_columns.name, "name", "name"),
+ (t.selected_columns.name, "name", "name", "name"),
t.selected_columns.name.type,
),
},
{
"myid": (
"myid",
- (table1.c.myid, "myid", "myid"),
+ (table1.c.myid, "myid", "myid", "mytable_myid"),
table1.c.myid.type,
)
},