executemany = False
compiled = None
statement = None
- _result_columns = None
+ result_column_struct = None
_is_implicit_returning = False
_is_explicit_returning = False
self.execution_options = compiled.statement._execution_options.union(
connection._execution_options)
- # compiled clauseelement. process bind params, process table defaults,
- # track collections used by ResultProxy to target and process results
-
- self._result_columns = compiled._result_columns
+ self.result_column_struct = (
+ compiled._result_columns, compiled._ordered_columns)
self.unicode_statement = util.text_type(compiled)
if not dialect.supports_unicode_statements:
self.cursor = self.create_cursor()
return self
- @util.memoized_property
- def result_map(self):
- if self._result_columns:
- return self.compiled.result_map
- else:
- return None
-
@util.memoized_property
def engine(self):
return self.root_connection.engine
translate_colname = context._translate_colname
self.case_sensitive = case_sensitive = dialect.case_sensitive
- if context._result_columns:
- num_ctx_cols = len(context._result_columns)
+ if context.result_column_struct:
+ result_columns, cols_are_ordered = context.result_column_struct
+ num_ctx_cols = len(result_columns)
else:
num_ctx_cols = None
if num_ctx_cols and \
- context.compiled._ordered_columns and \
+ cols_are_ordered and \
num_ctx_cols == len(metadata):
# case 1 - SQL expression statement, number of columns
# in result matches number of cols in compiled. This is the
obj,
None
) for idx, (key, name, obj, type_)
- in enumerate(context._result_columns)
+ in enumerate(result_columns)
]
self.keys = [
- elem[1] for elem in context._result_columns
+ elem[1] for elem in result_columns
]
else:
# case 2 - raw string, or number of columns in result does
# In all these cases we fall back to the "named" approach
# that SQLAlchemy has used up through 0.9.
+ if num_ctx_cols:
+ result_map = self._create_result_map(result_columns)
+
raw = []
self.keys = []
untranslated = None
if num_ctx_cols:
try:
- ctx_rec = context.result_map[colname]
+ ctx_rec = result_map[colname]
except KeyError:
mapped_type = typemap.get(coltype, sqltypes.NULLTYPE)
obj = None
# ambiguous column exception when accessed.
if len(by_key) != num_ctx_cols:
seen = set()
- for idx in range(num_ctx_cols):
- key = raw[idx][1]
+ for rec in raw:
+ key = rec[1]
if key in seen:
by_key[key] = (None, by_key[key][1], None)
seen.add(key)
for elem in raw if elem[5]
])
+ @classmethod
+ def _create_result_map(cls, result_columns):
+ d = {}
+ for elem in result_columns:
+ key, rec = elem[0], elem[1:]
+ if key in d:
+ # conflicting keyname, just double up the list
+ # of objects. this will cause an "ambiguous name"
+ # error if an attempt is made by the result set to
+ # access.
+ e_name, e_obj, e_type = d[key]
+ d[key] = e_name, e_obj + rec[1], e_type
+ else:
+ d[key] = rec
+ return d
+
@util.pending_deprecation("0.8", "sqlite dialect uses "
"_translate_colname() now")
def _set_keymap_synonym(self, name, origname):
if self.positional and dialect.paramstyle == 'numeric':
self._apply_numbered_params()
- @property
- def result_map(self):
- d = {}
- for elem in self._result_columns:
- key, rec = elem[0], elem[1:]
- if key in d:
- # conflicting keyname, just double up the list
- # of objects. this will cause an "ambiguous name"
- # error if an attempt is made by the result set to
- # access.
- e_name, e_obj, e_type = d[key]
- d[key] = e_name, e_obj + rec[1], e_type
- else:
- d[key] = rec
- return d
-
@util.memoized_instancemethod
def _init_cte_state(self):
"""Initialize collections related to CTEs only if
compiled object, for those values that are present."""
return self.construct_params(_check=False)
+ @util.dependencies("sqlalchemy.engine.result")
+ def _create_result_map(self, result):
+ """utility method used for unit tests only."""
+ return result.ResultMetaData._create_result_map(self._result_columns)
+
def default_from(self):
"""Called when a SELECT statement has no froms, and no FROM clause is
to be appended.
stmt = select([t]).union(select([t]))
comp = stmt.compile()
eq_(
- comp.result_map,
+ comp._create_result_map(),
{'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)}
)
stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a))
comp = stmt.compile()
eq_(
- comp.result_map,
+ comp._create_result_map(),
{'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}
)
stmt = select([t.c.a]).union(select([t.c.b]))
comp = stmt.compile()
eq_(
- comp.result_map,
+ comp._create_result_map(),
{'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)},
)
tc = type_coerce(t.c.a, String)
stmt = select([t.c.a, l1, tc])
comp = stmt.compile()
- tc_anon_label = comp.result_map['a_1'][1][0]
+ tc_anon_label = comp._create_result_map()['a_1'][1][0]
eq_(
- comp.result_map,
+ comp._create_result_map(),
{
'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
'bar': ('bar', (l1, 'bar'), l1.type),
t1.join(union, t1.c.a == union.c.t1_a)).apply_labels()
comp = stmt.compile()
eq_(
- set(comp.result_map),
+ set(comp._create_result_map()),
set(['t1_1_b', 't1_1_a', 't1_a', 't1_b'])
)
is_(
- comp.result_map['t1_a'][1][2], t1.c.a
+ comp._create_result_map()['t1_a'][1][2], t1.c.a
)
def test_insert_with_select_values(self):
stmt = t2.insert().values(a=select([astring])).returning(aint)
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
- comp.result_map,
+ comp._create_result_map(),
{'a': ('a', (aint, 'a', 'a'), aint.type)}
)
returning(aint)
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
- comp.result_map,
+ comp._create_result_map(),
{'a': ('a', (aint, 'a', 'a'), aint.type)}
)
# .key in SQL
for key, col in zip([c.name for c in s.c], s.inner_columns):
key = key % compiled.anon_map
- assert col in compiled.result_map[key][1]
+ assert col in compiled._create_result_map()[key][1]
_a_bkeyselect_bkey = ""
table1 = self.table1
compiled = s.compile(dialect=self._length_fixture())
- assert set(compiled.result_map['some_large_named_table__2'][1]).\
+ assert set(compiled._create_result_map()['some_large_named_table__2'][1]).\
issuperset(
[
'some_large_named_table_this_is_the_data_column',
]
)
- assert set(compiled.result_map['some_large_named_table__1'][1]).\
+ assert set(compiled._create_result_map()['some_large_named_table__1'][1]).\
issuperset(
[
'some_large_named_table_this_is_the_primarykey_column',
s2 = select([s])
compiled = s2.compile(dialect=self._length_fixture())
assert \
- set(compiled.result_map['this_is_the_data_column'][1]).\
+ set(compiled._create_result_map()['this_is_the_data_column'][1]).\
issuperset(['this_is_the_data_column',
s.c.this_is_the_data_column])
assert \
- set(compiled.result_map['this_is_the_primarykey_column'][1]).\
+ set(compiled._create_result_map()['this_is_the_primarykey_column'][1]).\
issuperset(['this_is_the_primarykey_column',
s.c.this_is_the_primarykey_column])
') '
'AS anon_1', dialect=dialect)
compiled = s.compile(dialect=dialect)
- assert set(compiled.result_map['anon_1_this_is_the_data_2'][1]).\
+ assert set(compiled._create_result_map()['anon_1_this_is_the_data_2'][1]).\
issuperset([
'anon_1_this_is_the_data_2',
q.corresponding_column(
table1.c.this_is_the_data_column)
])
- assert set(compiled.result_map['anon_1_this_is_the_prim_1'][1]).\
+ assert set(compiled._create_result_map()['anon_1_this_is_the_prim_1'][1]).\
issuperset([
'anon_1_this_is_the_prim_1',
q.corresponding_column(
dialect = default.DefaultDialect(label_length=10)
compiled = q.compile(dialect=dialect)
- assert set(compiled.result_map['some_2'][1]).issuperset([
+ assert set(compiled._create_result_map()['some_2'][1]).issuperset([
table1.c.this_is_the_data_column,
'some_large_named_table_this_is_the_data_column',
'some_2'
])
- assert set(compiled.result_map['some_1'][1]).issuperset([
+ assert set(compiled._create_result_map()['some_1'][1]).issuperset([
table1.c.this_is_the_primarykey_column,
'some_large_named_table_this_is_the_primarykey_column',
'some_1'
dialect = default.DefaultDialect(label_length=10)
compiled = x.compile(dialect=dialect)
- assert set(compiled.result_map['this_2'][1]).issuperset([
+ assert set(compiled._create_result_map()['this_2'][1]).issuperset([
q.corresponding_column(table1.c.this_is_the_data_column),
'this_is_the_data_column',
'this_2'])
- assert set(compiled.result_map['this_1'][1]).issuperset([
+ assert set(compiled._create_result_map()['this_1'][1]).issuperset([
q.corresponding_column(table1.c.this_is_the_primarykey_column),
'this_is_the_primarykey_column',
'this_1'])
'SELECT asdf.abcde FROM a AS asdf',
dialect=dialect)
compiled = s.compile(dialect=dialect)
- assert set(compiled.result_map['abcde'][1]).issuperset([
+ assert set(compiled._create_result_map()['abcde'][1]).issuperset([
'abcde', a1.c.abcde, 'abcde'])
# column still there, but short label
'SELECT asdf.abcde AS _1 FROM a AS asdf',
dialect=dialect)
compiled = s.compile(dialect=dialect)
- assert set(compiled.result_map['_1'][1]).issuperset([
+ assert set(compiled._create_result_map()['_1'][1]).issuperset([
'asdf_abcde', a1.c.abcde, '_1'])
def _assert_result_keys(self, s, keys):
compiled = s.compile()
- eq_(set(compiled.result_map), set(keys))
+ eq_(set(compiled._create_result_map()), set(keys))
def _assert_subq_result_keys(self, s, keys):
compiled = s.select().compile()
- eq_(set(compiled.result_map), set(keys))
+ eq_(set(compiled._create_result_map()), set(keys))
def _names_overlap(self):
m = MetaData()
compiled = stmt.compile()
return dict(
(elem, key)
- for key, elements in compiled.result_map.items()
+ for key, elements in compiled._create_result_map().items()
for elem in elements[1]
)
)
compiled = t.compile()
- eq_(compiled.result_map,
+ eq_(compiled._create_result_map(),
{'id': ('id',
(t.c.id._proxies[0],
'id',
t = text("select id, name from user").columns(id=Integer, name=String)
compiled = t.compile()
- eq_(compiled.result_map,
+ eq_(compiled._create_result_map(),
{'id': ('id',
(t.c.id._proxies[0],
'id',
table1.join(t, table1.c.myid == t.c.id))
compiled = stmt.compile()
eq_(
- compiled.result_map,
+ compiled._create_result_map(),
{
"myid": ("myid",
(table1.c.myid, "myid", "myid"), table1.c.myid.type),
compiled = stmt.compile()
return dict(
(elem, key)
- for key, elements in compiled.result_map.items()
+ for key, elements in compiled._create_result_map().items()
for elem in elements[1]
)
table = self._fixture()
compiled = select([table]).apply_labels().compile()
- assert table.c.y in compiled.result_map['test_table_y'][1]
- assert table.c.x in compiled.result_map['test_table_x'][1]
+ assert table.c.y in compiled._create_result_map()['test_table_y'][1]
+ assert table.c.x in compiled._create_result_map()['test_table_x'][1]
# the lower() function goes into the result_map, we don't really
# need this but it's fine
self.assert_compile(
- compiled.result_map['test_table_y'][1][2],
+ compiled._create_result_map()['test_table_y'][1][2],
"lower(test_table.y)"
)
# then the original column gets put in there as well.
# it's not important that it's the last value.
self.assert_compile(
- compiled.result_map['test_table_y'][1][-1],
+ compiled._create_result_map()['test_table_y'][1][-1],
"test_table.y"
)