def _iterate_polymorphic_properties(self, mappers=None):
if mappers is None:
mappers = self._with_polymorphic_mappers
- return iter(util.OrderedSet(
+ return iter(util.unique_list(
chain(*[list(mapper.iterate_properties) for mapper in [self] + mappers])
))
if manager is not None:
if manager.class_ is not self.class_:
# An inherited manager. Install one for this subclass.
- raise "eh?"
+ # TODO: no coverage here
manager = None
elif manager.mapper:
raise sa_exc.ArgumentError(
A ``sqlalchemy.engine.base.RowProxy`` instance or a
dictionary corresponding result-set ``ColumnElement``
instances to their values within a row.
- """
+ """
pk_cols = self.primary_key
if adapter:
pk_cols = [adapter.columns[c] for c in pk_cols]
primary_key
A list of values indicating the identifier.
+
"""
return (self._identity_class, tuple(util.to_list(primary_key)))
def primary_key_from_instance(self, instance):
"""Return the list of primary key values for the given
instance.
+
"""
state = attributes.instance_state(instance)
return self._primary_key_from_state(state)
else:
raise exc.UnmappedColumnError("No column %s is configured on mapper %s..." % (column, self))
- # TODO: improve names
+ # TODO: improve names?
def _get_state_attr_by_column(self, state, column):
return self._get_col_to_prop(column).getattr(state, column)
if filtered:
if single_entity:
- filter = util.OrderedIdentitySet
+ filter = lambda x: util.unique_list(x, util.IdentitySet)
else:
- filter = util.OrderedSet
+ filter = util.unique_list
else:
filter = None
self.stack.append({'from':correlate_froms, 'iswrapper':iswrapper})
# the actual list of columns to print in the SELECT column list.
- inner_columns = util.OrderedSet(
- [c for c in [
+ inner_columns = util.unique_list(
+ c for c in [
self.process(
self.label_select_column(select, co, asfrom=asfrom),
within_columns_clause=True,
**column_clause_args)
for co in select.inner_columns
]
- if c is not None]
+ if c is not None
)
-
- text = " ".join(["SELECT"] + [self.process(x) for x in select._prefixes]) + " "
+
+ text = "SELECT " # we're off to a good start !
+ if select._prefixes:
+ text += " ".join(self.process(x) for x in select._prefixes) + " "
text += self.get_select_precolumns(select)
text += ', '.join(inner_columns)
self.initial_quote = initial_quote
self.final_quote = final_quote or self.initial_quote
self.omit_schema = omit_schema
- self.__strings = {}
-
+ self._strings = {}
+
def _escape_identifier(self, value):
"""Escape an identifier.
or self.illegal_initial_characters.match(value[0])
or not self.legal_characters.match(unicode(value))
or (lc_value != value))
-
+
def quote(self, ident, force):
- if force:
+ if force is None:
+ if ident in self._strings:
+ return self._strings[ident]
+ else:
+ if self._requires_quotes(ident):
+ self._strings[ident] = self.quote_identifier(ident)
+ else:
+ self._strings[ident] = ident
+ return self._strings[ident]
+ elif force:
return self.quote_identifier(ident)
- elif force is False:
- return ident
-
- if ident in self.__strings:
- return self.__strings[ident]
else:
- if self._requires_quotes(ident):
- self.__strings[ident] = self.quote_identifier(ident)
- else:
- self.__strings[ident] = ident
- return self.__strings[ident]
+ return ident
def format_sequence(self, sequence, use_schema=True):
name = self.quote(sequence.name, sequence.quote)
"""
all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
- return a.intersection(
- [
- elem for elem in a if all_overlap.intersection(elem._cloned_set)
- ]
- )
+ return a.intersection(elem for elem in a if all_overlap.intersection(elem._cloned_set))
def _compound_select(keyword, *selects, **kwargs):
return CompoundSelect(keyword, *selects, **kwargs)
def _is_literal(element):
- global schema
- if not schema:
- from sqlalchemy import schema
- return not isinstance(element, (ClauseElement, Operators, schema.SchemaItem))
+ global _is_literal
+ from sqlalchemy import schema
+ def _is_literal(element):
+ return not isinstance(element, (ClauseElement, Operators, schema.SchemaItem))
+ return _is_literal(element)
def _from_objects(*elements, **kwargs):
return itertools.chain(*[element._get_from_objects(**kwargs) for element in elements])
Select statements support appendable clauses, as well as the
ability to execute themselves and return a result set.
- """
+ """
def __init__(self, columns, whereclause=None, from_obj=None, distinct=False, having=None, correlate=True, prefixes=None, **kwargs):
"""Construct a Select object.
Additional generative and mutator methods are available on the
[sqlalchemy.sql.expression#_SelectBaseMixin] superclass.
- """
+ """
self._should_correlate = correlate
self._distinct = distinct
def __init__(self, creator):
self.creator = creator
+
def __missing__(self, key):
self[key] = val = self.creator(key)
return val
def __init__(self, creator):
self.creator = creator
+
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
for o in iterable:
self.add(o)
+def unique_list(seq, compare_with=set):
+ seen = compare_with()
+ return [x for x in seq if x not in seen and not seen.add(x)]
class UniqueAppender(object):
"""Appends items to a collection ensuring uniqueness.
Column('c1', Integer, primary_key=True),
Column('c2', String(30)))
- @profiling.function_call_count(72, {'2.3': 44, '2.4': 46})
+ @profiling.function_call_count(72, {'2.4': 46})
def test_insert(self):
t1.insert().compile()
- @profiling.function_call_count(68, {'2.3': 47, '2.4': 42})
+ @profiling.function_call_count(70, {'2.4': 42})
def test_update(self):
t1.update().compile()
- @profiling.function_call_count(228, versions={'2.3': 153, '2.4':131})
+ @profiling.function_call_count(228, versions={'2.4':131})
def test_select(self):
s = select([t1], t1.c.c2==t2.c.c1)
s.compile()
-
if __name__ == '__main__':
testenv.main()