except KeyError:
assert False, "statement had 'orm' plugin but no plugin_subject"
else:
- bind_arguments["mapper"] = plugin_subject.mapper
+ if plugin_subject:
+ bind_arguments["mapper"] = plugin_subject.mapper
if load_options._autoflush:
session._autoflush()
self._polymorphic_adapters = {}
self._no_yield_pers = set()
- _QueryEntity.to_compile_state(self, statement_container._raw_columns)
-
self.compile_options = statement_container._compile_options
+ _QueryEntity.to_compile_state(self, statement_container._raw_columns)
+
self.current_path = statement_container._compile_options._current_path
if toplevel and statement_container._with_options:
)
self._setup_with_polymorphics()
- _QueryEntity.to_compile_state(self, select_statement._raw_columns)
-
self.compile_options = select_statement._compile_options
+ _QueryEntity.to_compile_state(self, select_statement._raw_columns)
+
# determine label style. we can make different decisions here.
# at the moment, trying to see if we can always use DISAMBIGUATE_ONLY
# rather than LABEL_STYLE_NONE, and if we can use disambiguate style
)
self.supports_single_entity = self.bundle.single_entity
+ if (
+ self.supports_single_entity
+ and not compile_state.compile_options._use_legacy_query_style
+ ):
+ util.warn_deprecated_20(
+ "The Bundle.single_entity flag has no effect when "
+ "using 2.0 style execution."
+ )
@property
def mapper(self):
# ensure existing entity_namespace remains
annotations = {"bundle": self, "entity_namespace": self}
annotations.update(self._annotations)
- return expression.ClauseList(
- _literal_as_text_role=roles.ColumnsClauseRole,
- group=False,
- *[e._annotations.get("bundle", e) for e in self.exprs]
- )._annotate(annotations)
+
+ plugin_subject = self.exprs[0]._propagate_attrs.get(
+ "plugin_subject", self.entity
+ )
+ return (
+ expression.ClauseList(
+ _literal_as_text_role=roles.ColumnsClauseRole,
+ group=False,
+ *[e._annotations.get("bundle", e) for e in self.exprs]
+ )
+ ._annotate(annotations)
+ ._set_propagate_attrs(
+ # the Bundle *must* use the orm plugin no matter what. the
+ # subject can be None but it's much better if it's not.
+ {
+ "compile_state_plugin": "orm",
+ "plugin_subject": plugin_subject,
+ }
+ )
+ )
@property
def clauses(self):
],
)
- def test_single_entity(self):
+ def test_single_entity_legacy_query(self):
Data = self.classes.Data
sess = Session()
[("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
)
- def test_single_entity_future(self):
+ def test_labeled_cols_non_single_entity_legacy_query(self):
+ Data = self.classes.Data
+ sess = Session()
+
+ b1 = Bundle("b1", Data.d1.label("x"), Data.d2.label("y"))
+
+ eq_(
+ sess.query(b1).filter(b1.c.x.between("d3d1", "d5d1")).all(),
+ [(("d3d1", "d3d2"),), (("d4d1", "d4d2"),), (("d5d1", "d5d2"),)],
+ )
+
+ def test_labeled_cols_single_entity_legacy_query(self):
+ Data = self.classes.Data
+ sess = Session()
+
+ b1 = Bundle(
+ "b1", Data.d1.label("x"), Data.d2.label("y"), single_entity=True
+ )
+
+ eq_(
+ sess.query(b1).filter(b1.c.x.between("d3d1", "d5d1")).all(),
+ [("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
+ )
+
+ def test_labeled_cols_as_rows_future(self):
+ Data = self.classes.Data
+ sess = Session()
+
+ b1 = Bundle("b1", Data.d1.label("x"), Data.d2.label("y"))
+
+ stmt = select(b1).filter(b1.c.x.between("d3d1", "d5d1"))
+
+ eq_(
+ sess.execute(stmt).all(),
+ [(("d3d1", "d3d2"),), (("d4d1", "d4d2"),), (("d5d1", "d5d2"),)],
+ )
+
+ def test_labeled_cols_as_scalars_future(self):
+ Data = self.classes.Data
+ sess = Session()
+
+ b1 = Bundle("b1", Data.d1.label("x"), Data.d2.label("y"))
+
+ stmt = select(b1).filter(b1.c.x.between("d3d1", "d5d1"))
+ eq_(
+ sess.execute(stmt).scalars().all(),
+ [("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
+ )
+
+ def test_single_entity_flag_is_legacy_w_future(self):
Data = self.classes.Data
sess = Session(testing.db, future=True)
+ # flag has no effect
b1 = Bundle("b1", Data.d1, Data.d2, single_entity=True)
+ stmt = select(b1).filter(b1.c.d1.between("d3d1", "d5d1"))
+
+ with testing.expect_deprecated_20(
+ "The Bundle.single_entity flag has no effect when "
+ "using 2.0 style execution."
+ ):
+ rows = sess.execute(stmt).all()
+ eq_(
+ rows,
+ [(("d3d1", "d3d2"),), (("d4d1", "d4d2"),), (("d5d1", "d5d2"),)],
+ )
+
+ def test_as_scalars_future(self):
+ Data = self.classes.Data
+ sess = Session(testing.db)
+
+ b1 = Bundle("b1", Data.d1, Data.d2)
+
stmt = select(b1).filter(b1.c.d1.between("d3d1", "d5d1"))
eq_(
sess.execute(stmt).scalars().all(),
"SELECT row_number() OVER (ORDER BY data.id, data.d1, data.d2) "
"AS anon_1 FROM data",
)
+
+ def test_non_mapped_columns_non_single_entity(self):
+ data_table = self.tables.data
+
+ b1 = Bundle("b1", data_table.c.d1, data_table.c.d2)
+
+ sess = Session()
+ eq_(
+ sess.query(b1).filter(b1.c.d1.between("d3d1", "d5d1")).all(),
+ [(("d3d1", "d3d2"),), (("d4d1", "d4d2"),), (("d5d1", "d5d2"),)],
+ )
+
+ def test_non_mapped_columns_single_entity(self):
+ data_table = self.tables.data
+
+ b1 = Bundle("b1", data_table.c.d1, data_table.c.d2, single_entity=True)
+
+ sess = Session()
+ eq_(
+ sess.query(b1).filter(b1.c.d1.between("d3d1", "d5d1")).all(),
+ [("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
+ )