=======
CHANGES
=======
+0.5.2
+======
+- sql
+ - Further fixes to the "percent signs and spaces in column/table
+ names" functionality. [ticket:1284]
+
0.5.1
========
- orm
+ - Removed an internal join cache which could potentially leak
+ memory when issuing query.join() repeatedly to ad-hoc
+ selectables.
+
+ - The "clear()", "save()", "update()", "save_or_update()"
+ Session methods have been deprecated, replaced by
+ "expunge_all()" and "add()". "expunge_all()" has also
+ been added to ScopedSession.
+
- Modernized the "no mapped table" exception and added a more
explicit __table__/__tablename__ exception to declarative.
- Test coverage added for `relation()` objects specified on
concrete mappers. [ticket:1237]
-
+
+ - Query.from_self() as well as query.subquery() both disable
+ the rendering of eager joins inside the subquery produced.
+ The "disable all eager joins" feature is available publically
+ via a new query.enable_eagerloads() generative. [ticket:1276]
+
+ - Added a rudimental series of set operations to Query that
+ receive Query objects as arguments, including union(),
+ union_all(), intersect(), except_(), insertsect_all(),
+ except_all(). See the API documentation for
+ Query.union() for examples.
+
+ - Fixed bug that prevented Query.join() and eagerloads from
+ attaching to a query that selected from a union or aliased union.
+
- A short documentation example added for bidirectional
relations specified on concrete mappers. [ticket:1237]
which contained no defined values for the child table where
an UPDATE with no SET clause would be rendered.
+ - Using delete-orphan on a many-to-many relation is deprecated.
+ This produces misleading or erroneous results since SQLA does
+ not retrieve the full list of "parents" for m2m. To get delete-orphan
+ behavior with an m2m table, use an explcit association class
+ so that the individual association row is treated as a parent.
+ [ticket:1281]
+
+ - delete-orphan cascade always requires delete cascade. Specifying
+ delete-orphan without delete now raises a deprecation warning.
+ [ticket:1281]
+
- sql
- Improved the methodology to handling percent signs in column
names from [ticket:1256]. Added more tests. MySQL and
properly.
- mssql
+ - Corrected handling of large decimal values with more robust
+ tests. Removed string manipulation on floats. [ticket:1280]
+
- Modified the do_begin handling in mssql to use the Cursor not
the Connection so it is DBAPI compatible.
+++ /dev/null
-Trunk of SQLAlchemy is now the pre-0.5 version. This version removes
-many things which were deprecated in 0.4 and therefore is not
-backwards compatible with all 0.4 appliactions.
-
-A work in progress describing the changes from 0.4 is at:
-
- http://www.sqlalchemy.org/trac/wiki/05Migration
-
-IMPORTANT: some file names have changed in this branch. Remove all
-existing *.pyc files before using!
-
-To continue working with the current development revision of version
-0.4, switch this working copy to the 0.4 maintenance branch:
-
- svn switch http://svn.sqlalchemy.org/sqlalchemy/branches/rel_0_4
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
-__version__ = '0.5.1'
+
+__version__ = '0.5.2'
del inspect, sys
# Not sure that this exception is needed
return value
else:
- # FIXME: this will not correct a situation where a float
- # gets converted to e-notation.
- if isinstance(value, decimal.Decimal) and value._exp < -6:
- value = ((value < 0 and '-' or '')
- + '0.'
- + '0' * -(value._exp+1)
- + value._int)
- return value
+ if isinstance(value, decimal.Decimal):
+ sign = (value < 0 and '-' or '')
+ if value._exp > -1:
+ return float(sign + value._int + '0' * value._exp)
+ else:
+ s = value._int.zfill(-value._exp+1)
+ pos = len(s) + value._exp
+ return sign + s[:pos] + '.' + s[pos:]
else:
- return str(value)
+ return value
+
return process
def get_col_spec(self):
else:
return "FLOAT(%(precision)s)" % {'precision': self.precision}
- def bind_processor(self, dialect):
- def process(value):
- """By converting to string, we can use Decimal types round-trip."""
- if not value is None:
- return str(value)
- return None
- return process
-
class MSReal(MSFloat):
"""A type for ``real`` numbers."""
def adapt(self, impltype):
return impltype()
- def bind_processor(self, dialect):
- def process(value):
- if value is not None:
- return float(value)
- else:
- return value
- return process
-
def get_col_spec(self):
return "REAL"
>>> db.loans.count()
1
>>> _ = db.loans.insert(book_id=1, user_name='Bhargan Basepair')
- >>> db.clear()
+ >>> db.expunge_all()
>>> db.flush()
>>> db.loans.count()
1
Session.flush()
def clear(self):
- Session.clear()
+ Session.expunge_all()
+
+ def expunge_all(self):
+ Session.expunge_all()
def map(self, selectable, **kwargs):
try:
self.join_depth = join_depth
self.local_remote_pairs = _local_remote_pairs
self.extension = extension
- self.__join_cache = {}
self.comparator_factory = comparator_factory or RelationProperty.Comparator
self.comparator = self.comparator_factory(self, None)
util.set_creation_order(self)
"- foreign key columns are present in both the parent and "
"the child's mapped tables. Specify 'foreign_keys' "
"argument." % (str(self)))
-
+
+ if self.cascade.delete_orphan and self.direction is MANYTOMANY:
+ util.warn("On %s, delete-orphan cascade is not supported on a "
+ "many-to-many relation. This will raise an error in 0.6." % self)
+
def _determine_local_remote_pairs(self):
if not self.local_remote_pairs:
if self.remote_side:
return self.mapper.common_parent(self.parent)
def _create_joins(self, source_polymorphic=False, source_selectable=None, dest_polymorphic=False, dest_selectable=None, of_type=None):
- key = util.WeakCompositeKey(source_polymorphic, source_selectable, dest_polymorphic, dest_selectable, of_type)
- try:
- return self.__join_cache[key]
- except KeyError:
- pass
-
if source_selectable is None:
if source_polymorphic and self.parent.with_polymorphic:
source_selectable = self.parent._with_polymorphic_selectable
else:
target_adapter = None
- self.__join_cache[key] = ret = (primaryjoin, secondaryjoin,
+ return (primaryjoin, secondaryjoin,
(source_selectable or self.parent.local_table),
(dest_selectable or self.mapper.local_table), secondary, target_adapter)
- return ret
def _get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
"""deprecated. use primary_join_against(), secondary_join_against(), full_join_against()"""
self._correlate = set()
self._joinpoint = None
self._with_labels = False
+ self._enable_eagerloads = True
self.__joinable_tables = None
self._having = None
self._populate_existing = False
def __set_select_from(self, from_obj):
if isinstance(from_obj, expression._SelectBaseMixin):
- # alias SELECTs and unions
from_obj = from_obj.alias()
self._from_obj = from_obj
equivs = self.__all_equivs()
if isinstance(from_obj, expression.Alias):
- # dont alias a regular join (since its not an alias itself)
self._from_obj_alias = sql_util.ColumnAdapter(self._from_obj, equivs)
-
+
def _get_polymorphic_adapter(self, entity, selectable):
self.__mapper_loads_polymorphically_with(entity.mapper, sql_util.ColumnAdapter(selectable, entity.mapper._equivalent_columns))
@property
def statement(self):
"""The full SELECT statement represented by this Query."""
+
return self._compile_context(labels=self._with_labels).statement._annotate({'_halt_adapt': True})
+ @property
+ def _nested_statement(self):
+ return self.with_labels().enable_eagerloads(False).statement.correlate(None)
+
def subquery(self):
- """return the full SELECT statement represented by this Query, embedded within an Alias."""
+ """return the full SELECT statement represented by this Query, embedded within an Alias.
+
+ Eager JOIN generation within the query is disabled.
+
+ """
- return self.statement.alias()
+ return self.enable_eagerloads(False).statement.alias()
+ @_generative()
+ def enable_eagerloads(self, value):
+ """Control whether or not eager joins are rendered.
+
+ When set to False, the returned Query will not render
+ eager joins regardless of eagerload() options
+ or mapper-level lazy=False configurations.
+
+ This is used primarily when nesting the Query's
+ statement into a subquery or other
+ selectable.
+
+ """
+ self._enable_eagerloads = value
+
@_generative()
def with_labels(self):
"""Apply column labels to the return value of Query.statement.
m = _MapperEntity(self, entity)
self.__setup_aliasizers([m])
- @_generative()
def from_self(self, *entities):
"""return a Query that selects from this Query's SELECT statement.
\*entities - optional list of entities which will replace
those being selected.
+
"""
+ fromclause = self._nested_statement
+ q = self._from_selectable(fromclause)
+ if entities:
+ q._set_entities(entities)
+ return q
+
+ _from_self = from_self
- fromclause = self.with_labels().statement.correlate(None)
+ @_generative()
+ def _from_selectable(self, fromclause):
self._statement = self._criterion = None
self._order_by = self._group_by = self._distinct = False
self._limit = self._offset = None
self.__set_select_from(fromclause)
- if entities:
- self._set_entities(entities)
-
- _from_self = from_self
def values(self, *columns):
"""Return an iterator yielding result tuples corresponding to the given list of columns"""
else:
self._having = criterion
+ def union(self, *q):
+ """Produce a UNION of this Query against one or more queries.
+
+ e.g.::
+
+ q1 = sess.query(SomeClass).filter(SomeClass.foo=='bar')
+ q2 = sess.query(SomeClass).filter(SomeClass.bar=='foo')
+
+ q3 = q1.union(q2)
+
+ The method accepts multiple Query objects so as to control
+ the level of nesting. A series of ``union()`` calls such as::
+
+ x.union(y).union(z).all()
+
+ will nest on each ``union()``, and produces::
+
+ SELECT * FROM (SELECT * FROM (SELECT * FROM X UNION SELECT * FROM y) UNION SELECT * FROM Z)
+
+ Whereas::
+
+ x.union(y, z).all()
+
+ produces::
+
+ SELECT * FROM (SELECT * FROM X UNION SELECT * FROM y UNION SELECT * FROM Z)
+
+ """
+ return self._from_selectable(
+ expression.union(*([self._nested_statement]+ [x._nested_statement for x in q])))
+
+ def union_all(self, *q):
+ """Produce a UNION ALL of this Query against one or more queries.
+
+ Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See that
+ method for usage examples.
+
+ """
+ return self._from_selectable(
+ expression.union_all(*([self._nested_statement]+ [x._nested_statement for x in q]))
+ )
+
+ def intersect(self, *q):
+ """Produce an INTERSECT of this Query against one or more queries.
+
+ Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See that
+ method for usage examples.
+
+ """
+ return self._from_selectable(
+ expression.intersect(*([self._nested_statement]+ [x._nested_statement for x in q]))
+ )
+
+ def intersect_all(self, *q):
+ """Produce an INTERSECT ALL of this Query against one or more queries.
+
+ Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See that
+ method for usage examples.
+
+ """
+ return self._from_selectable(
+ expression.intersect_all(*([self._nested_statement]+ [x._nested_statement for x in q]))
+ )
+
+ def except_(self, *q):
+ """Produce an EXCEPT of this Query against one or more queries.
+
+ Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See that
+ method for usage examples.
+
+ """
+ return self._from_selectable(
+ expression.except_(*([self._nested_statement]+ [x._nested_statement for x in q]))
+ )
+
+ def except_all(self, *q):
+ """Produce an EXCEPT ALL of this Query against one or more queries.
+
+ Works the same way as :meth:`~sqlalchemy.orm.query.Query.union`. See that
+ method for usage examples.
+
+ """
+ return self._from_selectable(
+ expression.except_all(*([self._nested_statement]+ [x._nested_statement for x in q]))
+ )
+
@util.accepts_a_list_as_starargs(list_deprecation='pending')
def join(self, *props, **kwargs):
"""Create a join against this ``Query`` object's criterion
self.primary_columns = []
self.secondary_columns = []
self.eager_order_by = []
-
+ self.enable_eagerloads = query._enable_eagerloads
self.eager_joins = {}
self.froms = []
self.adapter = None
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'clear', 'close', 'commit', 'connection', 'delete', 'execute', 'expire',
- 'expire_all', 'expunge', 'flush', 'get_bind', 'is_modified',
+ 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind', 'is_modified',
'merge', 'query', 'refresh', 'rollback', 'save',
'save_or_update', 'scalar', 'update')
not use any connection resources until they are first needed.
"""
- self.clear()
+ self.expunge_all()
if self.transaction is not None:
for transaction in self.transaction._iterate_parents():
transaction.close()
+ @classmethod
def close_all(cls):
"""Close *all* sessions in memory."""
for sess in _sessions.values():
sess.close()
- close_all = classmethod(close_all)
def expunge_all(self):
"""Remove all object instances from this ``Session``.
self.identity_map = self._identity_cls()
self._new = {}
self._deleted = {}
- clear = expunge_all
- # TODO: deprecate
- #clear = util.deprecated()(expunge_all)
+ clear = util.deprecated("Use session.expunge_all()")(expunge_all)
# TODO: need much more test coverage for bind_mapper() and similar !
# TODO: + crystalize + document resolution order vis. bind_mapper/bind_table
self.identity_map.discard(state)
self._deleted.pop(state, None)
- @util.pending_deprecation('0.5.x', "Use session.add()")
+ @util.deprecated("Use session.add()")
def save(self, instance):
"""Add a transient (unsaved) instance to this ``Session``.
state = _state_for_unsaved_instance(instance, create=True)
self._save_impl(state)
- @util.pending_deprecation('0.5.x', "Use session.add()")
+ @util.deprecated("Use session.add()")
def update(self, instance):
"""Bring a detached (saved) instance into this ``Session``.
self._cascade_save_or_update(state)
save_or_update = (
- util.pending_deprecation('0.5.x', "Use session.add()")(add))
+ util.deprecated("Use session.add()")(add))
def _cascade_save_or_update(self, state):
for state, mapper in _cascade_unknown_state_iterator('save-update', state, halt_on=lambda c:c in self):
merged = mapper.class_manager.new_instance()
merged_state = attributes.instance_state(merged)
new_instance = True
- self.save(merged)
+ self.add(merged)
_recursive[instance] = merged
def init(self):
super(EagerLoader, self).init()
- self.clauses = {}
self.join_depth = self.parent_property.join_depth
def init_class_attribute(self):
def setup_query(self, context, entity, path, adapter, column_collection=None, parentmapper=None, **kwargs):
"""Add a left outer join to the statement thats being constructed."""
+ if not context.enable_eagerloads:
+ return
+
path = path + (self.key,)
# check for user-defined eager alias
# whether or not the Query will wrap the selectable in a subquery,
# and then attach eager load joins to that (i.e., in the case of LIMIT/OFFSET etc.)
should_nest_selectable = context.query._should_nest_selectable
-
+
if entity in context.eager_joins:
entity_key, default_towrap = entity, entity.selectable
elif should_nest_selectable or not context.from_clause or not sql_util.search(context.from_clause, entity.selectable):
towrap = context.eager_joins.setdefault(entity_key, default_towrap)
- # create AliasedClauses object to build up the eager query. this is cached after 1st creation.
- # this also allows ORMJoin to cache the aliased joins it produces since we pass the same
- # args each time in the typical case.
- path_key = util.WeakCompositeKey(*path)
- try:
- clauses = self.clauses[path_key]
- except KeyError:
- self.clauses[path_key] = clauses = mapperutil.ORMAdapter(mapperutil.AliasedClass(self.mapper),
+ # create AliasedClauses object to build up the eager query.
+ clauses = mapperutil.ORMAdapter(mapperutil.AliasedClass(self.mapper),
equivalents=self.mapper._equivalent_columns)
if adapter:
self.merge = "merge" in values or "all" in values
self.expunge = "expunge" in values or "all" in values
self.refresh_expire = "refresh-expire" in values or "all" in values
+
+ if self.delete_orphan and not self.delete:
+ util.warn("The 'delete-orphan' cascade option requires "
+ "'delete'. This will raise an error in 0.6.")
+
for x in values:
if x not in all_cascades:
raise sa_exc.ArgumentError("Invalid cascade option '%s'" % x)
if isinstance(onclause, basestring):
prop = left_mapper.get_property(onclause)
elif isinstance(onclause, attributes.QueryableAttribute):
+ # TODO: we might want to honor the current adapt_from,
+ # if already set. we would need to adjust how we calculate
+ # adapt_from though since it is present in too many cases
+ # at the moment (query tests illustrate that).
adapt_from = onclause.__clause_element__()
prop = onclause.property
elif isinstance(onclause, MapperProperty):
return name % self.anon_map
def _process_anon(self, key):
- (ident, derived) = key.split(' ')
-
+ (ident, derived) = key.split(' ', 1)
anonymous_counter = self.anon_map.get(derived, 1)
self.anon_map[derived] = anonymous_counter + 1
return derived + "_" + str(anonymous_counter)
def visit_alias(self, alias, asfrom=False, **kwargs):
if asfrom:
- return self.process(alias.original, asfrom=True, **kwargs) + " AS " + self.preparer.format_alias(alias, alias.name % self.anon_map)
+ return self.process(alias.original, asfrom=True, **kwargs) + " AS " + \
+ self.preparer.format_alias(alias, alias.name % self.anon_map)
else:
return self.process(alias.original, **kwargs)
expressions and function calls.
"""
- return _generated_label("%%(%d %s)s" % (id(self), _escape_for_generated(getattr(self, 'name', 'anon'))))
+ return _generated_label("%%(%d %s)s" % (id(self), getattr(self, 'name', 'anon')))
class ColumnCollection(util.OrderedProperties):
"""An ordered dictionary that stores a list of ColumnElement
"""
if unique:
- self.key = _generated_label("%%(%d %s)s" % (id(self), key and _escape_for_generated(key) or 'param'))
+ self.key = _generated_label("%%(%d %s)s" % (id(self), key or 'param'))
else:
self.key = key or _generated_label("%%(%d param)s" % id(self))
self._orig_key = key or 'param'
def _clone(self):
c = ClauseElement._clone(self)
if self.unique:
- c.key = _generated_label("%%(%d %s)s" % (id(c), c._orig_key and _escape_for_generated(c._orig_key) or 'param'))
+ c.key = _generated_label("%%(%d %s)s" % (id(c), c._orig_key or 'param'))
return c
def _convert_to_unique(self):
if not self.unique:
self.unique = True
- self.key = _generated_label("%%(%d %s)s" % (id(self), self._orig_key and _escape_for_generated(self._orig_key) or 'param'))
+ self.key = _generated_label("%%(%d %s)s" % (id(self), self._orig_key or 'param'))
def bind_processor(self, dialect):
return self.type.dialect_impl(dialect).bind_processor(dialect)
if alias is None:
if self.original.named_with_column:
alias = getattr(self.original, 'name', None)
- alias = _generated_label('%%(%d %s)s' % (id(self), alias and _escape_for_generated(alias) or 'anon'))
+ alias = _generated_label('%%(%d %s)s' % (id(self), alias or 'anon'))
self.name = alias
@property
def __init__(self, name, element, type_=None):
while isinstance(element, _Label):
element = element.element
- self.name = self.key = self._label = name or _generated_label("%%(%d %s)s" % (id(self), _escape_for_generated(getattr(element, 'name', 'anon'))))
+ self.name = self.key = self._label = name or _generated_label("%%(%d %s)s" % (id(self), getattr(element, 'name', 'anon')))
self._element = element
self._type = type_
self.quote = element.quote
def self_group(self, against=None):
return _FromGrouping(self)
+ def is_derived_from(self, fromclause):
+ for s in self.selects:
+ if s.is_derived_from(fromclause):
+ return True
+ return False
+
def _populate_column_collection(self):
for cols in zip(*[s.c for s in self.selects]):
proxy = cols[0]._make_proxy(self, name=self.use_labels and cols[0]._label or None)
params (and the reverse for result sets.)
"""
-
__visit_name__ = 'text'
class Unicode(String):
except AttributeError:
pass
-class WeakCompositeKey(object):
- """an weak-referencable, hashable collection which is strongly referenced
- until any one of its members is garbage collected.
-
- """
- keys = set()
-
- __slots__ = 'args', '__weakref__'
-
- def __init__(self, *args):
- self.args = [self.__ref(arg) for arg in args]
- WeakCompositeKey.keys.add(self)
-
- def __ref(self, arg):
- if isinstance(arg, type):
- return weakref.ref(arg, self.__remover)
- else:
- return lambda: arg
-
- def __remover(self, wr):
- WeakCompositeKey.keys.discard(self)
-
- def __hash__(self):
- return hash(tuple(self))
-
- def __cmp__(self, other):
- return cmp(tuple(self), tuple(other))
-
- def __iter__(self):
- return iter(arg() for arg in self.args)
-
-
class _symbol(object):
def __init__(self, name):
"""Construct a new named symbol."""
from testlib import TestBase
from testlib.testing import eq_, is_, ne_
-
class OrderedDictTest(TestBase):
def test_odict(self):
o = util.OrderedDict()
eq_(set(util.class_hierarchy(Mixin)), set())
eq_(set(util.class_hierarchy(A)), set((A, B, object)))
+
if __name__ == "__main__":
testenv.main()
sess = create_session(bind=testing.db)
for i in range(100):
- sess.save(Foo(bar=i, range=i%10))
+ sess.add(Foo(bar=i, range=i%10))
sess.flush()
def tearDownAll(self):
def setUpAll(self):
global numeric_table, metadata
metadata = MetaData(testing.db)
+
+ def tearDown(self):
+ metadata.drop_all()
+
+ def test_decimal_notation(self):
+ import decimal
numeric_table = Table('numeric_table', metadata,
Column('id', Integer, Sequence('numeric_id_seq', optional=True), primary_key=True),
- Column('numericcol', Numeric(asdecimal=False))
+ Column('numericcol', Numeric(precision=38, scale=20, asdecimal=True))
)
metadata.create_all()
- def tearDownAll(self):
- metadata.drop_all()
+ try:
+ test_items = [decimal.Decimal(d) for d in '1500000.00000000000000000000',
+ '-1500000.00000000000000000000', '1500000',
+ '0.0000000000000000002', '0.2', '-0.0000000000000000002',
+ '156666.458923543', '-156666.458923543', '1', '-1', '1234',
+ '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
+ '1E-4', '1E-5', '1E-6', '1E-7', '1E-8']
+ for value in test_items:
+ numeric_table.insert().execute(numericcol=value)
+
+ for value in select([numeric_table.c.numericcol]).execute():
+ self.assertTrue(value[0] in test_items, "%s not in test_items" % value[0])
- def tearDown(self):
- numeric_table.delete().execute()
+ except Exception, e:
+ raise e
- def test_decimal_e_notation(self):
- from decimal import Decimal
+ def test_float(self):
+ float_table = Table('float_table', metadata,
+ Column('id', Integer, Sequence('numeric_id_seq', optional=True), primary_key=True),
+ Column('floatcol', Float())
+ )
+ metadata.create_all()
try:
- numeric_table.insert().execute(numericcol=Decimal('4.1'))
- numeric_table.insert().execute(numericcol=Decimal('1E-1'))
- numeric_table.insert().execute(numericcol=Decimal('1E-2'))
- numeric_table.insert().execute(numericcol=Decimal('1E-3'))
- numeric_table.insert().execute(numericcol=Decimal('1E-4'))
- numeric_table.insert().execute(numericcol=Decimal('1E-5'))
- numeric_table.insert().execute(numericcol=Decimal('1E-6'))
- numeric_table.insert().execute(numericcol=Decimal('1E-7'))
- numeric_table.insert().execute(numericcol=Decimal('1E-8'))
- numeric_table.insert().execute(numericcol=10000)
+ test_items = [float(d) for d in '1500000.00000000000000000000',
+ '-1500000.00000000000000000000', '1500000',
+ '0.0000000000000000002', '0.2', '-0.0000000000000000002',
+ '156666.458923543', '-156666.458923543', '1', '-1', '1234',
+ '2E-12', '4E8', '3E-6', '3E-7', '4.1', '1E-1', '1E-2', '1E-3',
+ '1E-4', '1E-5', '1E-6', '1E-7', '1E-8']
+ for value in test_items:
+ float_table.insert().execute(floatcol=value)
+
except Exception, e:
raise e
foo = Foo()
foo.id = 1
foo.intarr = [1,2,3]
- sess.save(foo)
+ sess.add(foo)
sess.flush()
- sess.clear()
+ sess.expunge_all()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3])
foo.intarr.append(4)
sess.flush()
- sess.clear()
+ sess.expunge_all()
foo = sess.query(Foo).get(1)
self.assertEquals(foo.intarr, [1,2,3,4])
foo.intarr = []
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(foo.intarr, [])
foo.intarr = None
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(foo.intarr, None)
# Errors in r4217:
foo = Foo()
foo.id = 2
- sess.save(foo)
+ sess.add(foo)
sess.flush()
class TimeStampTest(TestBase, AssertsExecutionResults):
def roundtrip(self, obj):
if obj not in self.session:
- self.session.save(obj)
+ self.session.add(obj)
self.session.flush()
id, type_ = obj.id, type(obj)
- self.session.clear()
+ self.session.expunge_all()
return self.session.query(type_).get(id)
def _test_sequence_ops(self):
def roundtrip(obj):
if obj not in session:
- session.save(obj)
+ session.add(obj)
session.flush()
id, type_ = obj.id, type(obj)
- session.clear()
+ session.expunge_all()
return session.query(type_).get(id)
p = Parent('p')
self.metadata.drop_all()
def roundtrip(self, obj):
- self.session.save(obj)
+ self.session.add(obj)
self.session.flush()
id, type_ = obj.id, type(obj)
- self.session.clear()
+ self.session.expunge_all()
return self.session.query(type_).get(id)
def test_lazy_list(self):
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', addresses=[
Address(email='one'),
u1 = User(name='ed', addresses=[Address(email='abc'), Address(email='def'), Address(email='xyz')])
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(User).filter(User.name == 'ed').one(),
User(name='ed', addresses=[Address(email='xyz'), Address(email='def'), Address(email='abc')])
)
u1 = User(name='ed', addresses=[Address(email='abc'), Address(email='xyz'), Address(email='def')])
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(User).filter(User.name == 'ed').one(),
User(name='ed', addresses=[Address(email='abc'), Address(email='def'), Address(email='xyz')])
)
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', addresses=[
Address(email='one'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).options(eagerload(User.addresses)).all(), [User(name='u1', addresses=[
Address(email='one'),
Address(email='two'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).filter(User.name == 'u1').one()
a = u.addresses
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', addresses=[
Address(email='one'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(),
[User(name='u1', address_count=2, addresses=[
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(),
[User(name='u1', a='a', b='b')])
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(),
[User(name='u1', adr_count=2, addresses=[
sess = create_session()
sess.add(User(name='u1'))
sess.flush()
- sess.clear()
+ sess.expunge_all()
u1 = sess.query(User).filter(User.name == 'u1').one()
assert 'name' not in u1.__dict__
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', addresses=[
Address(email='one'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(),
[User(name='u1', address_count=2, addresses=[
sess.add(c1)
sess.add(c2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_((sess.query(Company).
filter(Company.employees.of_type(Engineer).
# assert that the "id" column is available without a second load.
# this would be the symptom of the previous step not being correct.
- sess.clear()
+ sess.expunge_all()
def go():
assert sess.query(Manager).filter(Manager.name=='dogbert').one().id
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
assert sess.query(Person).filter(Manager.name=='dogbert').one().id
self.assert_sql_count(testing.db, go, 1)
sess.add(c1)
sess.add(c2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_((sess.query(Person).
filter(Engineer.primary_language == 'cobol').first()),
sess.add(c1)
sess.add(c2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_((sess.query(Person).
filter(Engineer.primary_language == 'cobol').first()),
sess.add(c1)
sess.add(c2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_((sess.query(Person).with_polymorphic(Engineer).
filter(Engineer.primary_language == 'cobol').first()),
e3 =Engineer(name="vlad", primary_language=cobol)
sess.add_all([e1, e2, e3])
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_((sess.query(Person).
filter(Engineer.primary_language.has(Language.name=='cobol')).first()),
sess.add_all([e1, e2, m1, e3])
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(
sess.query(Person).order_by(Person.name).all(),
[
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', addresses=[
Address(email='one'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(nom='u1', addresses=[
Address(email='one'),
sess = create_session()
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).all(), [User(name='u1', handles=[
IMHandle(network='blabber', handle='foo'),
eq_(u1.uc_name, 'SOMEUSER', u1.uc_name)
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
rt = sess.query(User).filter(User.uc_name == 'SOMEUSER').one()
eq_(rt, u1)
- sess.clear()
+ sess.expunge_all()
rt = sess.query(User).filter(User.uc_name.startswith('SOMEUSE')).one()
eq_(rt, u1)
self.assert_(s1.bullets[3].position == 4)
session = create_session()
- session.save(s1)
+ session.add(s1)
session.flush()
id = s1.id
- session.clear()
+ session.expunge_all()
del s1
srt = session.query(Slide).get(id)
s1.bullets._reorder()
self.assert_(s1.bullets[4].position == 5)
session = create_session()
- session.save(s1)
+ session.add(s1)
session.flush()
id = s1.id
- session.clear()
+ session.expunge_all()
del s1
srt = session.query(Slide).get(id)
srt.bullets._raw_append(Bullet('raw2'))
srt.bullets[-1].position = 6
session.flush()
- session.clear()
+ session.expunge_all()
srt = session.query(Slide).get(id)
titles = ['s1/b1','s1/b2','s1/b100','s1/b4', 'raw', 'raw2']
self.assert_(s1.bullets[5].position == 5)
session = create_session()
- session.save(s1)
+ session.add(s1)
session.flush()
id = s1.id
- session.clear()
+ session.expunge_all()
del s1
srt = session.query(Slide).get(id)
self.assert_(s1.bullets[li] == b[bi])
session = create_session()
- session.save(s1)
+ session.add(s1)
session.flush()
id = s1.id
- session.clear()
+ session.expunge_all()
del s1
srt = session.query(Slide).get(id)
self.assert_(s1.bullets[2].position == 2)
session = create_session()
- session.save(s1)
+ session.add(s1)
session.flush()
new_bullet = Bullet('new 2')
id = s1.id
session.flush()
- session.clear()
+ session.expunge_all()
srt = session.query(Slide).get(id)
sess.add_all((item1, item2))
sess.flush()
saved = repr([item1, item2])
- sess.clear()
+ sess.expunge_all()
l = sess.query(Item).all()
loaded = repr(l)
eq_(saved, loaded)
item1.keywords.append(KeywordAssociation(red_keyword, 'new_red_assoc'))
sess.flush()
saved = repr([item1])
- sess.clear()
+ sess.expunge_all()
l = sess.query(Item).all()
loaded = repr(l)
eq_(saved, loaded)
sess.flush()
saved = repr([item1, item2])
- sess.clear()
+ sess.expunge_all()
l = sess.query(Item).all()
loaded = repr(l)
eq_(saved, loaded)
session = create_session()
session.add(p)
session.flush()
- session.clear()
+ session.expunge_all()
obj = session.query(Left).filter_by(data='l1').one()
d2.comments = [Comment('uid2', 'comment')]
sess.add_all((d, d2))
sess.flush()
- sess.clear()
+ sess.expunge_all()
# this eager load sets up an AliasedClauses for the "comment"
# relationship, then stores it in clauses_by_lead_mapper[mapper for
# Derived]
d = sess.query(Derived).get('uid1')
- sess.clear()
+ sess.expunge_all()
assert len([c for c in d.comments]) == 1
# this eager load sets up an AliasedClauses for the "comment"
# for DerivedII]. the bug was that the previous AliasedClause create
# prevented this population from occurring.
d2 = sess.query(DerivedII).get('uid2')
- sess.clear()
+ sess.expunge_all()
# object is not in the session; therefore the lazy load cant trigger
# here, eager load had to succeed
sess = create_session()
sess.add(d)
sess.flush()
- sess.clear()
+ sess.expunge_all()
x = sess.query(Design).get(1)
x.inheritedParts
company_id = c1.company_id
invoice_id = i1.invoice_id
- session.clear()
+ session.expunge_all()
c = session.query(Company).get(company_id)
- session.clear()
+ session.expunge_all()
i = session.query(Invoice).get(invoice_id)
eq_(c, i.company)
company_id = c1.company_id
- session.clear()
+ session.expunge_all()
a = session.query(Company).get(company_id)
session.flush()
invoice_id = i1.invoice_id
- session.clear()
+ session.expunge_all()
c = session.query(Company).get(company_id)
- session.clear()
+ session.expunge_all()
i = session.query(Invoice).get(invoice_id)
eq_(c, i.company)
session.add(acc1)
session.flush()
- session.clear()
+ session.expunge_all()
def go():
# load just the first Account. eager loading will actually load
sess = create_session(bind=bind)
assert sess.bind is bind
f = Foo()
- sess.save(f)
+ sess.add(f)
sess.flush()
assert sess.query(Foo).get(f.id) is f
finally:
Order(description='someotherorder')])
sess.add(u)
sess.flush()
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(u.id)
eq_(u, User(name='jack',
u.orders=[Order(description="order 3"), Order(description="order 4")]
sess.flush()
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(u.id)
eq_(u, User(name='jack',
Address(email_address="address2")])
sess.add(u)
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert addresses.count().scalar() == 2
assert users.count().scalar() == 1
u2 = User(name='newuser', orders=[o])
sess.add(u2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert users.count().scalar() == 1
assert orders.count().scalar() == 1
eq_(sess.query(User).all(),
assert u1 not in sess
assert o1 in sess
- sess.clear()
+ sess.expunge_all()
o1 = Order()
u1 = User(orders=[o1])
assert o1 not in sess
assert u1 in sess
- sess.clear()
+ sess.expunge_all()
u1 = User()
o1 = Order()
assert i1 in sess
assert k1 not in sess
- sess.clear()
+ sess.expunge_all()
i1 = Item()
k1 = Keyword()
jack = sess.query(User).filter_by(name="jack").one()
p = jack.pref
e = jack.pref.extra[0]
- sess.clear()
+ sess.expunge_all()
jack.pref = None
- sess.update(jack)
- sess.update(p)
- sess.update(e)
+ sess.add(jack)
+ sess.add(p)
+ sess.add(e)
assert p in sess
assert e in sess
sess.flush()
eq_(sess.query(T3).all(), [])
class M2MCascadeTest(_base.MappedTest):
+ """delete-orphan cascade is deprecated on many-to-many."""
+
def define_tables(self, metadata):
Table('a', metadata,
Column('id', Integer, primary_key=True),
- Column('data', String(30)))
+ Column('data', String(30)),
+ test_needs_fk=True
+ )
Table('b', metadata,
Column('id', Integer, primary_key=True),
- Column('data', String(30)))
+ Column('data', String(30)),
+ test_needs_fk=True
+
+ )
Table('atob', metadata,
Column('aid', Integer, ForeignKey('a.id')),
- Column('bid', Integer, ForeignKey('b.id')))
+ Column('bid', Integer, ForeignKey('b.id')),
+ test_needs_fk=True
+
+ )
Table('c', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
- Column('bid', Integer, ForeignKey('b.id')))
+ Column('bid', Integer, ForeignKey('b.id')),
+ test_needs_fk=True
+
+ )
def setup_classes(self):
class A(_fixtures.Base):
class C(_fixtures.Base):
pass
+ @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_delete_orphan(self):
mapper(A, a, properties={
assert b.count().scalar() == 0
assert a.count().scalar() == 1
+ @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_delete_orphan_cascades(self):
mapper(A, a, properties={
assert a.count().scalar() == 1
assert c.count().scalar() == 0
+ @testing.emits_warning(".*not supported on a many-to-many")
@testing.resolve_artifact_names
def test_cascade_delete(self):
mapper(A, a, properties={
assert b.count().scalar() == 0
assert a.count().scalar() == 0
+ @testing.emits_warning(".*not supported on a many-to-many")
+ @testing.fails_on_everything_except('sqlite')
+ @testing.resolve_artifact_names
+ def test_this_doesnt_work(self):
+ """illustrates why cascade with m2m should not be supported
+ (i.e. many parents...)
+
+ """
+ mapper(A, a, properties={
+ 'bs':relation(B, secondary=atob, cascade="all, delete-orphan")
+ })
+ mapper(B, b)
+
+ sess = create_session()
+ b1 =B(data='b1')
+ a1 = A(data='a1', bs=[b1])
+ a2 = A(data='a2', bs=[b1])
+ sess.add(a1)
+ sess.add(a2)
+ sess.flush()
+
+ sess.delete(a1)
+
+ # this raises an integrity error on DBs that support FKs
+ sess.flush()
+
+ # still a row present !
+ assert atob.count().scalar() ==1
+
+ # but no bs !
+ assert b.count().scalar() == 0
+ assert a.count().scalar() == 1
+
class UnsavedOrphansTest(_base.MappedTest):
"""Pending entities that are orphans"""
u.addresses.remove(a1)
assert a1 in s
s.flush()
- s.clear()
+ s.expunge_all()
eq_(s.query(Address).all(), [Address(email_address='ad1')])
b1 = Business(description='business1', address=Address(street='address2'))
session.add_all((h1,b1))
session.flush()
- session.clear()
+ session.expunge_all()
eq_(session.query(Home).get(h1.id), Home(description='home1', address=Address(street='address1')))
eq_(session.query(Business).get(b1.id), Business(description='business1', address=Address(street='address2')))
sess.add(a1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(A).get(a1.id),
A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))
a1.bs[1].foo='b3modified'
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(A).get(a1.id),
A(name='a1', bs=[B(name='b1'), B(name='b2'), B(name='b3')]))
session.add(p)
session.flush()
pid = p.id
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
Child('foo', 'newvalue'))
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
self.assert_(len(list(collections.collection_adapter(p.children))) == 2)
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
self.assert_(len(list(collections.collection_adapter(p.children))) == 2)
self.assert_(len(list(collections.collection_adapter(p.children))) == 1)
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
self.assert_(len(list(collections.collection_adapter(p.children))) == 1)
del p.children['bar']
self.assert_(len(list(collections.collection_adapter(p.children))) == 0)
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
self.assert_(len(list(collections.collection_adapter(p.children))) == 0)
session.add(p)
session.flush()
pid = p.id
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
Child('foo', '1', 'newvalue'))
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Parent).get(pid)
sess = create_session()
sess.add(f)
sess.flush()
- sess.clear()
+ sess.expunge_all()
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
sess = create_session()
sess.add(f)
sess.flush()
- sess.clear()
+ sess.expunge_all()
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
sess = create_session()
sess.add(f)
sess.flush()
- sess.clear()
+ sess.expunge_all()
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
col.append_with_event(Bar('b'))
f.bars['a'] = Bar('a')
sess.flush()
- sess.clear()
+ sess.expunge_all()
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
sess = create_session()
sess.add(p1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
p2 = sess.query(Parent).get(p1.col1)
o = list(p2.children)
sess = create_session()
sess.add(c1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
c1 = sess.query(C1).get(c1.c1)
c2 = C1()
c2.parent = c1
s = create_session()
s.add(t1)
s.flush()
- s.clear()
+ s.expunge_all()
t = s.query(TT).filter_by(id=t1.id).one()
eq_(t.children[0].parent_uuid, t1.uuid)
t1.children.append(t2)
s.add(t1)
s.flush()
- s.clear()
+ s.expunge_all()
t = s.query(TT).filter_by(id=t2.id).one()
eq_(t.uuid, t2.uuid)
c1.child1_data = "qwerty"
session.add(c1)
session.flush()
- session.clear()
+ session.expunge_all()
c1 = session.query(Child1).filter_by(child1_data="qwerty").one()
c2 = Child2()
# to fire off anyway
session.add(f2)
session.flush()
- session.clear()
+ session.expunge_all()
f1 = session.query(A).get(f1.id)
f2 = session.query(A).get(f2.id)
from sqlalchemy.orm import attributes
self.assertEquals(attributes.get_history(attributes.instance_state(u1), 'addresses'), ([], [Address(email_address='lala@hoho.com')], []))
- sess.clear()
+ sess.expunge_all()
# test the test fixture a little bit
assert User(name='jack', addresses=[Address(email_address='wrong')]) != sess.query(User).first()
sess.delete(u.addresses[3])
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(u.id)
sess.delete(u)
sess.delete(u.addresses[3])
assert [Address(email_address='a'), Address(email_address='b'), Address(email_address='d')] == list(u.addresses)
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(u.id)
sess.delete(u)
address.user = user
session.add(user)
session.flush()
- session.clear()
+ session.expunge_all()
def query1():
session = create_session(testing.db)
sess.query(User).all()
m.add_property("addresses", relation(mapper(Address, addresses)))
- sess.clear()
+ sess.expunge_all()
def go():
eq_(
[User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])],
sess.query(Address).filter(Address.id.in_([1, 4, 5])),
sess.query(Address).filter(Address.id.in_([1, 4, 5])).limit(3)
]:
- sess.clear()
+ sess.expunge_all()
eq_(q.all(),
[Address(id=1, user=User(id=7)),
Address(id=4, user=User(id=8)),
sess.query(User).filter(User.id==7),
sess.query(User).filter(User.id==7).limit(1)
]:
- sess.clear()
+ sess.expunge_all()
eq_(q.all(),
[User(id=7, addresses=[Address(id=1)])]
)
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(7)
def go():
assert u.addresses[0].user_id==7
'dingalings':relation(Dingaling, lazy=False)})
mapper(Dingaling, dingalings, properties={
'address_id':deferred(dingalings.c.address_id)})
- sess.clear()
+ sess.expunge_all()
def go():
u = sess.query(User).get(8)
eq_(User(id=8,
self.assertEquals(o1.address, None)
self.assert_sql_count(testing.db, go, 2)
- sess.clear()
+ sess.expunge_all()
def go():
o1 = sess.query(Order).filter(Order.id==5).one()
self.assertEquals(o1.address, None)
eq_(ret, self._assert_result())
self.assert_sql_count(testing.db, go, 6)
- sess.clear()
+ sess.expunge_all()
def go():
ret = sess.query(User, oalias).options(eagerload('addresses'), eagerload(oalias.items)).join(('orders', oalias)).order_by(User.id, oalias.id).all()
eq_(ret, self._assert_result())
n1.children[1].append(Node(data='n121'))
n1.children[1].append(Node(data='n122'))
n1.children[1].append(Node(data='n123'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
def go():
d = sess.query(Node).filter_by(data='n1').first()
assert Node(data='n1', children=[
n1.children[1].append(Node(data='n121'))
n1.children[1].append(Node(data='n122'))
n1.children[1].append(Node(data='n123'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# eager load with join depth 1. when eager load of 'n1' hits the
# children of 'n12', no columns are present, eager loader degrades to
n1 = Node(data='n1')
n1.append(Node(data='n11'))
n1.append(Node(data='n12'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(
)
self.assert_sql_count(testing.db, go, 4)
- sess.clear()
+ sess.expunge_all()
def go():
assert Node(data='n1', children=[Node(data='n11'), Node(data='n12')]) == sess.query(Node).options(undefer('data')).order_by(Node.id).first()
self.assert_sql_count(testing.db, go, 3)
- sess.clear()
+ sess.expunge_all()
def go():
assert Node(data='n1', children=[Node(data='n11'), Node(data='n12')]) == sess.query(Node).options(undefer('data'), undefer('children.data')).first()
n1.children[1].append(Node(data='n121'))
n1.children[1].append(Node(data='n122'))
n1.children[1].append(Node(data='n123'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
def go():
d = sess.query(Node).filter_by(data='n1').options(eagerload('children.children')).first()
assert Node(data='n1', children=[
n1.children[1].append(Node(data='n121'))
n1.children[1].append(Node(data='n122'))
n1.children[1].append(Node(data='n123'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
def go():
d = sess.query(Node).filter_by(data='n1').first()
assert Node(data='n1', children=[
w1 = Widget(name=u'w1')
w2 = Widget(name=u'w2')
w1.children.append(w2)
- sess.save(w1)
+ sess.add(w1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert [Widget(name='w1', children=[Widget(name='w2')])] == sess.query(Widget).filter(Widget.name==u'w1').all()
})
session = create_session()
- session.save(User(name='joe', tags=[Tag(score1=5.0, score2=3.0), Tag(score1=55.0, score2=1.0)]))
- session.save(User(name='bar', tags=[Tag(score1=5.0, score2=4.0), Tag(score1=50.0, score2=1.0), Tag(score1=15.0, score2=2.0)]))
+ session.add(User(name='joe', tags=[Tag(score1=5.0, score2=3.0), Tag(score1=55.0, score2=1.0)]))
+ session.add(User(name='bar', tags=[Tag(score1=5.0, score2=4.0), Tag(score1=50.0, score2=1.0), Tag(score1=15.0, score2=2.0)]))
session.flush()
- session.clear()
+ session.expunge_all()
for user in session.query(User).all():
eq_(user.query_score, user.prop_score)
mapper(User, users)
s = create_session()
u = s.query(User).get(7)
- s.clear()
+ s.expunge_all()
self.assertRaisesMessage(sa.exc.InvalidRequestError, r"is not persistent within this Session", s.expire, u)
u.name = 'somenewname'
self.assert_sql_count(testing.db, go, 0)
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(User).get(7).name == 'somenewname'
@testing.resolve_artifact_names
sa.orm.clear_mappers()
mapper(Order, orders)
- sess.clear()
+ sess.expunge_all()
# same tests, using deferred at the options level
o = sess.query(Order).options(sa.orm.defer('description')).get(3)
mapper(User, users)
s = create_session()
u = s.query(User).get(7)
- s.clear()
+ s.expunge_all()
self.assertRaisesMessage(sa.exc.InvalidRequestError, r"is not persistent within this Session", lambda: s.refresh(u))
@testing.resolve_artifact_names
s.add(u)
s.flush()
- s.clear()
+ s.expunge_all()
u = s.query(User).filter(User.name=='Justin').one()
s.expire(u)
#print "APPENDING", parent.__class__.__name__ , "TO", child.__class__.__name__
- sess.save(parent_obj)
+ sess.add(parent_obj)
parent_obj.collection.append(child_obj)
if direction == ONETOMANY:
child2 = child_class('child2')
parent_obj.collection.append(child2)
- sess.save(child2)
+ sess.add(child2)
elif direction == MANYTOONE:
parent2 = parent_class('parent2')
parent2.collection.append(child_obj)
- sess.save(parent2)
- sess.save(somea)
- sess.save(someb)
- sess.save(somec)
+ sess.add(parent2)
+ sess.add(somea)
+ sess.add(someb)
+ sess.add(somec)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# assert result via direct get() of parent object
result = sess.query(parent_class).get(parent_obj.id)
assert result2.id == parent2.id
assert result2.collection[0].id == child_obj.id
- sess.clear()
+ sess.expunge_all()
# assert result via polymorphic load of parent object
result = sess.query(A).filter_by(id=parent_obj.id).one()
sess = create_session()
for x in (a1, b1, b2, b3, c1, c2, c3):
- sess.save(x)
+ sess.add(x)
sess.flush()
- sess.clear()
+ sess.expunge_all()
#for obj in sess.query(A).all():
# print obj
b1 = Blub("blub #1")
b2 = Blub("blub #2")
f = Foo("foo #1")
- sess.save(b1)
- sess.save(b2)
- sess.save(f)
+ sess.add(b1)
+ sess.add(b2)
+ sess.add(f)
b1.parent_foo = f
b2.parent_foo = f
sess.flush()
compare = ','.join([repr(b1), repr(b2), repr(b1.parent_foo), repr(b2.parent_foo)])
- sess.clear()
+ sess.expunge_all()
l = sess.query(Blub).all()
result = ','.join([repr(l[0]), repr(l[1]), repr(l[0].parent_foo), repr(l[1].parent_foo)])
print compare
mapper(Bar, inherits=Foo, polymorphic_identity=0)
sess = create_session()
f1 = Bar()
- sess.save(f1)
+ sess.add(f1)
sess.flush()
assert f1.type == 0
- sess.clear()
+ sess.expunge_all()
assert isinstance(sess.query(Foo).one(), Bar)
class PolymorphicSynonymTest(ORMTest):
sess = create_session()
at1 = T1(info='at1')
at2 = T2(info='at2', data='t2 data')
- sess.save(at1)
- sess.save(at2)
+ sess.add(at1)
+ sess.add(at2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(T2).filter(T2.info=='at2').one(), at2)
self.assertEquals(at2.info, "THE INFO IS:at2")
t4_1 = T4(data='t4')
t3_1.t4s.append(t4_1)
- sess.save(t1_1)
+ sess.add(t1_1)
assert t4_1 in sess.new
f = Foo()
b = Bar()
bl = Blub()
- sess.save(f)
- sess.save(b)
- sess.save(bl)
+ sess.add(f)
+ sess.add(b)
+ sess.add(bl)
sess.flush()
if polymorphic:
admin_mapper = mapper(Admin, admins, inherits=user_mapper)
sess = create_session()
adminrole = Role()
- sess.save(adminrole)
+ sess.add(adminrole)
sess.flush()
# create an Admin, and append a Role. the dependency processors
a = Admin()
a.roles.append(adminrole)
a.password = 'admin'
- sess.save(a)
+ sess.add(a)
sess.flush()
assert user_roles.count().scalar() == 1
adminrole = Role('admin')
sess = create_session()
- sess.save(adminrole)
+ sess.add(adminrole)
sess.flush()
# create admin user
a = Admin(email='tim', password='admin')
a.roles.append(adminrole)
- sess.save(a)
+ sess.add(a)
sess.flush()
a.password = 'sadmin'
b1 = Base(value='b1')
s1 = Sub(value='sub1', subdata='some subdata')
- sess.save(b1)
- sess.save(s1)
+ sess.add(b1)
+ sess.add(s1)
sess.flush()
b1 = Base(value='b1')
s1 = Sub(value='sub1', subdata='some subdata')
s2 = Sub(value='sub2', subdata='some other subdata')
- sess.save(b1)
- sess.save(s1)
- sess.save(s2)
+ sess.add(b1)
+ sess.add(s1)
+ sess.add(s2)
sess.flush()
session = create_session()
a = A(data1='a1')
- session.save(a)
+ session.add(a)
b = B(data1='b1', data2='b2')
- session.save(b)
+ session.add(b)
c = C(data1='c1', data2='c2', data3='c3')
- session.save(c)
+ session.add(c)
session.flush()
- session.clear()
+ session.expunge_all()
assert len(session.query(A).all()) == 3
assert len(session.query(B).all()) == 2
sess.add_all([s1, b1])
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(Base).get(b1.base_id).subdata == "this is base"
assert sess.query(Sub).get(s1.base_id).subdata == "this is sub"
sess.add_all([s1, b1])
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(Base).get(b1.base_id).data == "this is base"
assert sess.query(Sub).get(s1.base_id).data == "this is base"
s1 = Sub()
s1.data = 's1data'
s1.sub = 's1sub'
- sess.save(s1)
+ sess.add(s1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# load s1 via Base. s1.id won't populate since it's relative to
# the "sub" table. The optimized load kicks in and tries to
concrete=True, polymorphic_identity='engineer')
session = create_session()
- session.save(Manager('Tom', 'knows how to manage things'))
- session.save(Engineer('Kurt', 'knows how to hack'))
+ session.add(Manager('Tom', 'knows how to manage things'))
+ session.add(Engineer('Kurt', 'knows how to hack'))
session.flush()
- session.clear()
+ session.expunge_all()
assert set([repr(x) for x in session.query(Employee)]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
assert set([repr(x) for x in session.query(Manager)]) == set(["Manager Tom knows how to manage things"])
self.assertEquals(hacker.nickname, "Badass")
self.assert_sql_count(testing.db, go, 0)
- session.clear()
+ session.expunge_all()
assert repr(session.query(Employee).filter(Employee.name=='Tom').one()) == "Manager Tom knows how to manage things"
assert repr(session.query(Manager).filter(Manager.name=='Tom').one()) == "Manager Tom knows how to manage things"
self.assertEquals(hacker.nickname, "Badass")
self.assert_sql_count(testing.db, go, 0)
- session.clear()
+ session.expunge_all()
# check that we aren't getting a cartesian product in the raw SQL.
# this requires that Engineer's polymorphic discriminator is not rendered
c = Company()
c.employees.append(Manager('Tom', 'knows how to manage things'))
c.employees.append(Engineer('Kurt', 'knows how to hack'))
- session.save(c)
+ session.add(c)
session.flush()
- session.clear()
+ session.expunge_all()
def go():
c2 = session.query(Company).get(c.id)
assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
self.assert_sql_count(testing.db, go, 2)
- session.clear()
+ session.expunge_all()
def go():
c2 = session.query(Company).options(eagerload(Company.employees)).get(c.id)
assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
def _set_name(self, name):
session = create_session()
s = session.query(LocationName).filter(LocationName.name==name).first()
- session.clear()
+ session.expunge_all()
if s is not None:
self._name = s
page = ClassifiedPage(magazine=magazine,page_no=1)
page2 = MagazinePage(magazine=magazine,page_no=2)
page3 = ClassifiedPage(magazine=magazine,page_no=3)
- session.save(pub)
+ session.add(pub)
session.flush()
print [x for x in session]
- session.clear()
+ session.expunge_all()
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Publication).filter(Publication.name=="Test").one()
print p.issues[0].locations[0].magazine.pages
g = Group(name="group1")
g.users.append(User(name="user1", password="pw", email="foo@bar.com", login_id="lg1"))
sess = create_session()
- sess.save(g)
+ sess.add(g)
sess.flush()
# TODO: put an assertion
print class_mapper(Bar).primary_key
b = Bar('somedata')
sess = create_session()
- sess.save(b)
+ sess.add(b)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# test that "bar.bid" does not need to be referenced in a get
# (ticket 185)
sess = create_session()
b = Bar('barfoo')
- sess.save(b)
+ sess.add(b)
sess.flush()
f1 = Foo('subfoo1')
b.foos.append(f2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
l = sess.query(Bar).all()
print l[0]
sess = create_session()
b = Bar('bar #1')
- sess.save(b)
+ sess.add(b)
b.foos.append(Foo("foo #1"))
b.foos.append(Foo("foo #2"))
sess.flush()
compare = repr(b) + repr(sorted([repr(o) for o in b.foos]))
- sess.clear()
+ sess.expunge_all()
l = sess.query(Bar).all()
print repr(l[0]) + repr(l[0].foos)
found = repr(l[0]) + repr(sorted([repr(o) for o in l[0].foos]))
b2 = Bar("bar #2")
bl1 = Blub("blub #1")
for o in (f1, b1, b2, bl1):
- sess.save(o)
+ sess.add(o)
bl1.foos.append(f1)
bl1.bars.append(b2)
sess.flush()
compare = repr(bl1)
blubid = bl1.id
- sess.clear()
+ sess.expunge_all()
l = sess.query(Blub).all()
print l
self.assert_(repr(l[0]) == compare)
- sess.clear()
+ sess.expunge_all()
x = sess.query(Blub).filter_by(id=blubid).one()
print x
self.assert_(repr(x) == compare)
obj = newobj
# save to DB
- sess.save(t)
+ sess.add(t)
sess.flush()
# string version of the saved list
# clear and query forwards
- sess.clear()
+ sess.expunge_all()
node = sess.query(Table1).filter(Table1.id==t.id).first()
assertlist = []
while (node):
forwards = repr(assertlist)
# clear and query backwards
- sess.clear()
+ sess.expunge_all()
node = sess.query(Table1).filter(Table1.id==obj.id).first()
assertlist = []
while (node):
c.employees.append(Person(status='HHH', name='joesmith'))
c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', name='wally'))
c.employees.append(Manager(status='ABA', manager_name='manager2', name='jsmith'))
- session.save(c)
+ session.add(c)
session.flush()
- session.clear()
+ session.expunge_all()
self.assertEquals(session.query(Company).get(c.company_id), c)
class RelationToSubclassTest(PolymorphTest):
c = Company(name='company1')
c.managers.append(Manager(status='AAB', manager_name='manager1', name='pointy haired boss'))
- sess.save(c)
+ sess.add(c)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company).filter_by(company_id=c.company_id).one(), c)
assert c.managers[0].company is c
session = create_session()
c = Company(name='company1')
c.employees = employees
- session.save(c)
+ session.add(c)
session.flush()
- session.clear()
+ session.expunge_all()
self.assertEquals(session.query(Person).get(dilbert.person_id), dilbert)
- session.clear()
+ session.expunge_all()
self.assertEquals(session.query(Person).filter(Person.person_id==dilbert.person_id).one(), dilbert)
- session.clear()
+ session.expunge_all()
def go():
cc = session.query(Company).get(c.company_id)
dilbert.engineer_name = 'hes dibert!'
session.flush()
- session.clear()
+ session.expunge_all()
def go():
session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first()
self.assert_sql_count(testing.db, go, 1)
- session.clear()
+ session.expunge_all()
dilbert = session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first()
def go():
# assert that only primary table is queried for already-present-in-session
# test standalone orphans
daboss = Boss(status='BBB', manager_name='boss', golf_swing='fore', **{person_attribute_name:'daboss'})
- session.save(daboss)
+ session.add(daboss)
self.assertRaises(orm_exc.FlushError, session.flush)
c = session.query(Company).first()
daboss.company = c
manager_list = [e for e in c.employees if isinstance(e, Manager)]
session.flush()
- session.clear()
+ session.expunge_all()
self.assertEquals(session.query(Manager).order_by(Manager.person_id).all(), manager_list)
c = session.query(Company).first()
p = Person(name='some person')
m = Manager(name='some manager')
p.manager = m
- session.save(p)
+ session.add(p)
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Person).get(p.person_id)
m = session.query(Manager).get(m.person_id)
p = Person(name='some person')
m = Manager(name='some manager')
m.employee = p
- session.save(m)
+ session.add(m)
session.flush()
- session.clear()
+ session.expunge_all()
p = session.query(Person).get(p.person_id)
m = session.query(Manager).get(m.person_id)
m.colleague = p
if usedata:
m.data = Data('ms data')
- sess.save(m)
+ sess.add(m)
sess.flush()
- sess.clear()
+ sess.expunge_all()
p = sess.query(Person).get(p.person_id)
m = sess.query(Manager).get(m.person_id)
print p
p.data = Data('ps data')
m.data = Data('ms data')
- sess.save(m)
- sess.save(p)
+ sess.add(m)
+ sess.add(p)
sess.flush()
- sess.clear()
+ sess.expunge_all()
p = sess.query(Person).get(p.person_id)
p2 = sess.query(Person).get(p2.person_id)
p3 = sess.query(Person).get(p3.person_id)
# creating 5 managers named from M1 to E5
for i in range(1,5):
- session.save(Manager(name="M%d" % i,longer_status="YYYYYYYYY"))
+ session.add(Manager(name="M%d" % i,longer_status="YYYYYYYYY"))
# creating 5 engineers named from E1 to E5
for i in range(1,5):
- session.save(Engineer(name="E%d" % i,status="X"))
+ session.add(Engineer(name="E%d" % i,status="X"))
session.flush()
manager3 = session.query(Manager).filter(Manager.name=="M3").first()
car1 = Car(employee=engineer4)
- session.save(car1)
+ session.add(car1)
car2 = Car(employee=manager3)
- session.save(car2)
+ session.add(car2)
session.flush()
- session.clear()
+ session.expunge_all()
def go():
testcar = session.query(Car).options(eagerload('employee')).get(car1.car_id)
assert str(usingGet) == "Engineer E4, status X"
assert str(usingProperty) == "Engineer E4, status X"
- session.clear()
+ session.expunge_all()
print "-----------------------------------------------------------------"
# and now for the lightning round, eager !
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testing.db, go, 1)
- session.clear()
+ session.expunge_all()
s = session.query(Car)
c = s.join("employee").filter(Person.name=="E4")[0]
assert c.car_id==car1.car_id
car1 = Car()
car2 = Car()
car2.manager = Manager()
- sess.save(car1)
- sess.save(car2)
+ sess.add(car1)
+ sess.add(car2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
carlist = sess.query(Car).all()
assert carlist[0].manager is None
m = Manager(name='manager1')
m2 =Manager(name='manager2')
m.colleague = m2
- sess.save(m)
+ sess.add(m)
sess.flush()
- sess.clear()
+ sess.expunge_all()
m = sess.query(Manager).get(m.person_id)
m2 = sess.query(Manager).get(m2.person_id)
assert m.colleague is m2
car=Car()
else:
car=Offraod_Car()
- session.save(Manager(name="M%d" % i,category="YYYYYYYYY",car=car))
- session.save(Engineer(name="E%d" % i,field="X",car=car))
+ session.add(Manager(name="M%d" % i,category="YYYYYYYYY",car=car))
+ session.add(Engineer(name="E%d" % i,field="X",car=car))
session.flush()
- session.clear()
+ session.expunge_all()
r = session.query(Person).all()
for p in r:
active = Status(name="active")
dead = Status(name="dead")
- session.save(active)
- session.save(dead)
+ session.add(active)
+ session.add(dead)
session.flush()
# TODO: we haven't created assertions for all the data combinations created here
st=active
else:
st=dead
- session.save(Manager(name="M%d" % i,category="YYYYYYYYY",status=st))
- session.save(Engineer(name="E%d" % i,field="X",status=st))
+ session.add(Manager(name="M%d" % i,category="YYYYYYYYY",status=st))
+ session.add(Engineer(name="E%d" % i,field="X",status=st))
session.flush()
# create 2 cars for E4, one active and one dead
car1 = Car(employee=engineer4,status=active)
car2 = Car(employee=engineer4,status=dead)
- session.save(car1)
- session.save(car2)
+ session.add(car1)
+ session.add(car2)
session.flush()
# this particular adapt used to cause a recursion overflow;
c = Manager().set( name= 'head', machine= 'fast', duties= 'many')
session = create_session()
- session.save(a)
- session.save(b)
- session.save(c)
+ session.add(a)
+ session.add(b)
+ session.add(c)
session.flush()
assert set(session.query(Employee).all()) == set([a,b,c])
assert set(session.query( Engineer).all()) == set([b,c])
ot1 = T1()
ot2 = T2()
sess = create_session()
- sess.save(ot1)
- sess.save(ot2)
+ sess.add(ot1)
+ sess.add(ot2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# query using get(), using only one value. this requires the select_table mapper
# has the same single-col primary key.
ot1 = T1()
ot2 = T2()
sess = create_session()
- sess.save(ot1)
- sess.save(ot2)
+ sess.add(ot1)
+ sess.add(ot2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# query using get(), using only one value. this requires the select_table mapper
# has the same single-col primary key.
session = create_session()
bob = Employee()
- session.save(bob)
+ session.add(bob)
tag = Tag('crazy')
bob.tags.append(tag)
bob.tags.append(tag)
session.flush()
- session.clear()
+ session.expunge_all()
# query from Employee with limit, query needs to apply eager limiting subquery
instance = session.query(Employee).filter_by(id=1).limit(1).first()
assert len(instance.tags) == 2
c = C(cdata='c1', adata='a1', b=B(data='c'))
d = D(cdata='c2', adata='a2', ddata='d2', b=B(data='d'))
sess = create_session()
- sess.save(c)
- sess.save(d)
+ sess.add(c)
+ sess.add(d)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(A).all(), [C(cdata='c1', adata='a1'), D(cdata='c2', adata='a2', ddata='d2')])
if __name__ == "__main__":
d1 = Detail(name='d1')
a1.specification.append(SpecLine(slave=d1))
- session.save(a1)
+ session.add(a1)
orig = repr(a1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
s = SpecLine(slave=Product(name='p1'))
s2 = SpecLine(slave=Detail(name='d1'))
- session.save(s)
- session.save(s2)
+ session.add(s)
+ session.add(s2)
orig = repr([s, s2])
session.flush()
- session.clear()
+ session.expunge_all()
new = repr(session.query(SpecLine).all())
print orig
print new
a1.specification.append(SpecLine(slave=Detail(name='d1')))
a1.documents.append(Document('doc1'))
a1.documents.append(RasterDocument('doc2'))
- session.save(a1)
+ session.add(a1)
orig = repr(a1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
a1 = Assembly(name='a1')
a1.documents.append(RasterDocument('doc2'))
- session.save(a1)
+ session.add(a1)
orig = repr(a1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
del a1.documents[0]
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Product).filter_by(name='a1').one()
assert len(session.query(Document).all()) == 0
a1.specification.append(SpecLine(slave=Detail(name='d1')))
a1.documents.append(Document('doc1'))
a1.documents.append(RasterDocument('doc2'))
- session.save(a1)
+ session.add(a1)
orig = repr(a1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
c2.employees = [e3]
sess = create_session()
- sess.save(c1)
- sess.save(c2)
+ sess.add(c1)
+ sess.add(c2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
all_employees = [e1, e2, b1, m1, e3]
c1_employees = [e1, e2, b1, m1]
sess = create_session()
for aliased in (True, False):
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Person).with_polymorphic(Manager).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%review%')).all(), [b1, m1])
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Person).with_polymorphic([Manager, Engineer]).join('paperwork', aliased=aliased).filter(Paperwork.description.like('%#2%')).all(), [e1, m1])
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Person).with_polymorphic([Manager, Engineer]).join('paperwork', aliased=aliased).filter(Person.name.like('%dog%')).filter(Paperwork.description.like('%#2%')).all(), [m1])
def test_join_to_polymorphic(self):
self.assertEquals(sess.query(Person).with_polymorphic(Engineer).filter(Engineer.primary_language=='java').all(), emps_without_relations[0:1])
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(Person).with_polymorphic('*').all(), emps_without_relations)
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(Person).with_polymorphic(Engineer).all(), emps_without_relations)
self.assert_sql_count(testing.db, go, 3)
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(Person).with_polymorphic(Engineer, people.outerjoin(engineers)).all(), emps_without_relations)
self.assert_sql_count(testing.db, go, 3)
- sess.clear()
+ sess.expunge_all()
def go():
# limit the polymorphic join down to just "Person", overriding select_table
self.assertEquals(sess.query(Person).with_polymorphic(Person).all(), emps_without_relations)
p1 = Person(name='dogbert')
e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
sess = create_session()
- sess.save(p1)
- sess.save(e1)
+ sess.add(p1)
+ sess.add(e1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Engineer).filter(Engineer.reports_to.has(Person.name=='dogbert')).first(), Engineer(name='dilbert'))
p1 = Person(name='dogbert')
e1 = Engineer(name='dilbert', primary_language='java', reports_to=p1)
sess = create_session()
- sess.save(p1)
- sess.save(e1)
+ sess.add(p1)
+ sess.add(e1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(
sess.query(Engineer).join('reports_to', aliased=True).filter(Person.name=='dogbert').first(),
m1 = Manager(name='dogbert')
e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
sess = create_session()
- sess.save(m1)
- sess.save(e1)
+ sess.add(m1)
+ sess.add(e1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Engineer).filter(Engineer.reports_to.has(Manager.name=='dogbert')).first(), Engineer(name='dilbert'))
m1 = Manager(name='dogbert')
e1 = Engineer(name='dilbert', primary_language='java', reports_to=m1)
sess = create_session()
- sess.save(m1)
- sess.save(e1)
+ sess.add(m1)
+ sess.add(e1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(
sess.query(Engineer).join('reports_to', aliased=True).filter(Manager.name=='dogbert').first(),
org2 = Organization(name='org2', engineers=[e3, e4])
sess = create_session()
- sess.save(org1)
- sess.save(org2)
+ sess.add(org1)
+ sess.add(org2)
sess.flush()
def test_not_contains(self):
e2 = JuniorEngineer(name='Ed', engineer_info='oh that ed', company=c1)
sess.add_all([c1, c2, m1, m2, e1, e2])
sess.commit()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(
sess.query(Company).filter(Company.employees.of_type(JuniorEngineer).any()).all(),
[
self.assertEquals(c1.engineers, [e2])
self.assertEquals(c2.engineers, [e1])
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company).order_by(Company.name).all(),
[
Company(name='c1', engineers=[JuniorEngineer(name='Ed')]),
)
# eager load join should limit to only "Engineer"
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company).options(eagerload('engineers')).order_by(Company.name).all(),
[
Company(name='c1', engineers=[JuniorEngineer(name='Ed')]),
)
# join() to Company.engineers, Employee as the requested entity
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company, Employee).join(Company.engineers).order_by(Company.name).all(),
[
(Company(name='c1'), JuniorEngineer(name='Ed')),
# join() to Company.engineers, Engineer as the requested entity.
# this actually applies the IN criterion twice which is less than ideal.
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company, Engineer).join(Company.engineers).order_by(Company.name).all(),
[
(Company(name='c1'), JuniorEngineer(name='Ed')),
)
# join() to Company.engineers without any Employee/Engineer entity
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company).join(Company.engineers).filter(Engineer.name.in_(['Tom', 'Kurt'])).all(),
[
Company(name='c2')
# section to "inheritance" laying out all the various behaviors Query has.
@testing.fails_on_everything_except()
def go():
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Company).\
filter(Company.company_id==Engineer.company_id).filter(Engineer.name.in_(['Tom', 'Kurt'])).all(),
[
mapper(Manager, inherits=Employee,polymorphic_identity='manager')
sess = create_session()
- sess.save(Person(name='p1'))
- sess.save(Employee(name='e1', employee_data='ed1'))
- sess.save(Manager(name='m1', employee_data='ed2', manager_data='md1'))
+ sess.add(Person(name='p1'))
+ sess.add(Employee(name='e1', employee_data='ed1'))
+ sess.add(Manager(name='m1', employee_data='ed2', manager_data='md1'))
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Person).order_by(Person.person_id).all(), [
Person(name='p1'),
Employee(name='e1', employee_data='ed1'),
Manager(name='m1', employee_data='ed2', manager_data='md1')
])
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Employee).order_by(Person.person_id).all(), [
Employee(name='e1', employee_data='ed1'),
Manager(name='m1', employee_data='ed2', manager_data='md1')
])
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(Manager).order_by(Person.person_id).all(), [
Manager(name='m1', employee_data='ed2', manager_data='md1')
])
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(Person).with_polymorphic('*').order_by(Person.person_id).all(), [
ad1 = Address(email_address='somenewaddress', id=12)
sess.add(ad1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
ad2 = sess.query(Address).get(1)
ad3 = sess.query(Address).get(ad1.id)
p3.places.append(p4)
sess.flush()
- sess.clear()
+ sess.expunge_all()
l = sess.query(Place).order_by(place.c.place_id).all()
(p1, p2, p3, p4, p5, p6, p7) = l
assert p1.places == [p2,p3,p5]
sess.add(tran)
sess.flush()
- sess.clear()
+ sess.expunge_all()
r = sess.query(Transition).all()
self.assert_unordered_result(r, Transition,
{'name': 'transition1',
self.assert_(len(c1.students) == 1)
sess.add(s1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
s = sess.query(Student).filter_by(name='Student1').one()
c = sess.query(Course).filter_by(name='Course3').one()
self.assert_(len(s.courses) == 3)
u.name = 'ed'
u3 = User()
u3.name = 'some user'
- sess.save(u3)
+ sess.add(u3)
sess.flush()
sess.rollback()
m = mapper(User, users)
m.add_property('name', synonym('_name', map_column=True))
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).filter_by(name='jack').one()
eq_(u._name, 'jack')
eq_(u.name, 'jack')
eq_(u.uc_name, "SOME USER NAME")
sess.flush()
- sess.clear()
+ sess.expunge_all()
q = sess.query(User)
u2 = q.filter(User.name=='some user name').one()
eq_(len(u.addresses), 3)
self.sql_count_(0, go)
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).filter_by(id=8).one()
eq_(u.id, 8)
eq_(l, self.static.user_address_result)
self.sql_count_(1, go)
- sess.clear()
+ sess.expunge_all()
# then select just from users. run it into instances.
# then assert the data, which will launch 3 more lazy loads
eq_(l, self.static.user_all_result)
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
# then select just from users. run it into instances.
# then assert the data, which will launch 6 more lazy loads
eq_(u1.name, 'ed modified')
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).filter_by(name='ed modified').one(), User(name='ed'))
u1.addresses.append(Address(id=15, email_address='foo@bar.com'))
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(
sess.query(User).filter_by(name='edward').one(),
User(name='edward', addresses=[Address(email_address='foo@bar.com')])
sess = create_session()
o = Order()
- sess.save(o)
+ sess.add(o)
o.id = 7
def go():
o.description = "some description"
sess = create_session()
o = Order()
- sess.save(o)
+ sess.add(o)
o.id = 7
def go():
o.description = "some description"
sess = create_session()
o = Order()
- sess.save(o)
+ sess.add(o)
def go():
o.description = "some description"
self.sql_count_(0, go)
("SELECT orders.user_id AS orders_user_id "
"FROM orders WHERE orders.id = :param_1",
{'param_1':1})])
- sess.clear()
+ sess.expunge_all()
q2 = q.options(sa.orm.undefer('user_id'))
self.sql_eq_(q2.all, [
self.sql_count_(1, go)
eq_(item.description, 'item 4')
- sess.clear()
+ sess.expunge_all()
l = q.options(sa.orm.undefer('orders.items.description')).all()
item = l[0].orders[1].items[1]
def go():
def test_query_twice_with_clear(self):
session = create_session()
result = session.query(Thing).first()
- session.clear()
+ session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_eagerload_with_clear(self):
session = create_session()
human = session.query(Human).options(sa.orm.eagerload("thing")).first()
- session.clear()
+ session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
def test_join_with_clear(self):
session = create_session()
result = session.query(Human).add_entity(Thing).join("thing").first()
- session.clear()
+ session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
g.version_id=1
g.edges.append(Edge(Point(3, 4), Point(5, 6)))
g.edges.append(Edge(Point(14, 5), Point(2, 7)))
- sess.save(g)
+ sess.add(g)
sess.flush()
- sess.clear()
+ sess.expunge_all()
g2 = sess.query(Graph).get([g.id, g.version_id])
for e1, e2 in zip(g.edges, g2.edges):
eq_(e1.start, e2.start)
g2.edges[1].end = Point(18, 4)
sess.flush()
- sess.clear()
+ sess.expunge_all()
e = sess.query(Edge).get(g2.edges[1].id)
eq_(e.end, Point(18, 4))
e.end.x = 19
e.end.y = 5
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(Edge).get(g2.edges[1].id).end, Point(19, 5))
g.edges[1].end = Point(19, 5)
- sess.clear()
+ sess.expunge_all()
def go():
g2 = (sess.query(Graph).
options(sa.orm.eagerload('edges'))).get([g.id, g.version_id])
sess = create_session()
g = Graph(Version(1, 1))
- sess.save(g)
+ sess.add(g)
sess.flush()
- sess.clear()
+ sess.expunge_all()
g2 = sess.query(Graph).get([1, 1])
eq_(g.version, g2.version)
- sess.clear()
+ sess.expunge_all()
g2 = sess.query(Graph).get(Version(1, 1))
eq_(g.version, g2.version)
@testing.fails_on_everything_except("sqlite")
def go():
g = Graph(Version(2, None))
- sess.save(g)
+ sess.add(g)
sess.flush()
- sess.clear()
+ sess.expunge_all()
g2 = sess.query(Graph).filter_by(version=Version(2, None)).one()
eq_(g.version, g2.version)
go()
sess = create_session()
f1 = Foobar()
f1.foob = FBComposite(None, 5, None, None)
- sess.save(f1)
+ sess.add(f1)
sess.flush()
assert f1.foob == FBComposite(2, 5, 15, None)
sess = create_session()
f1 = Foobar()
f1.foob = FBComposite(None, 5, None, None)
- sess.save(f1)
+ sess.add(f1)
sess.flush()
assert f1.foob == FBComposite(2, 5, 15, None)
e = Edge(None, None)
g.edges.append(e)
- sess.save(g)
+ sess.add(g)
sess.flush()
- sess.clear()
+ sess.expunge_all()
g2 = sess.query(Graph).get([1, 1])
assert g2.edges[-1].start.x is None
mapper(User, users, extension=Ext())
sess = create_session()
u = User(name='u1')
- sess.save(u)
+ sess.add(u)
sess.flush()
u = sess.query(User).populate_existing().get(u.id)
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(u.id)
u.name = 'u1 changed'
sess.flush()
sess = create_session()
am = AdminUser(name='au1', email_address='au1@e1')
- sess.save(am)
+ sess.add(am)
sess.flush()
am = sess.query(AdminUser).populate_existing().get(am.id)
- sess.clear()
+ sess.expunge_all()
am = sess.query(AdminUser).get(am.id)
am.name = 'au1 changed'
sess.flush()
sess = create_session()
i1 = Item(description="i1")
k1 = Keyword(name="k1")
- sess.save(i1)
- sess.save(k1)
+ sess.add(i1)
+ sess.add(k1)
sess.flush()
eq_(methods,
['instrument_class', 'instrument_class', 'init_instance',
sess = create_session()
am = AdminUser(name="au1", email_address="au1@e1")
- sess.save(am)
+ sess.add(am)
sess.flush()
am = sess.query(AdminUser).populate_existing().get(am.id)
- sess.clear()
+ sess.expunge_all()
am = sess.query(AdminUser).get(am.id)
am.name = 'au1 changed'
sess.flush()
u1.name = 'ed'
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(User).first()
s = create_session()
for i in range(3):
h1 = H1()
- s.save(h1)
+ s.add(h1)
h1.h2s.append(H2())
h1.h3s.extend([H3(), H3()])
m = Map(state='AK', mapper=c)
sess = create_session()
- sess.save(c)
+ sess.add(c)
sess.flush()
- sess.clear()
+ sess.expunge_all()
for C, M in ((Cartographer, Map),
(sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
assert u2 in sess
eq_(u2, User(id=7, name='fred'))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).first(), User(id=7, name='fred'))
@testing.resolve_artifact_names
assert merged_users[0] is not u
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).one(),
User(id=7, name='fred', addresses=OrderedSet([
u = User(id=7, name='fred')
sess.add(u)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(on_load.called, 0)
assert u2 is not _u2
eq_(on_load.called, 1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).first(), User(id=7, name='fred jones'))
eq_(on_load.called, 2)
sess = create_session()
sess.add(u)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(on_load.called, 0)
]))
)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).one(),
User(id=7, name='fred', addresses=OrderedSet([
Address(id=3, email_address='fred3'),
sess = create_session()
sess.add(u)
sess.flush()
- sess.clear()
+ sess.expunge_all()
u.name='fred jones'
u.addresses.add(Address(id=3, email_address='fred3'))
u = sess.merge(u)
eq_(on_load.called, 4)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(User).first(),
User(id=7, name='fred jones', addresses=OrderedSet([
Address(email_address='hoho@bar.com')]))
sess.flush()
- sess.clear()
+ sess.expunge_all()
u2 = sess.query(User).get(7)
eq_(u2, User(id=7, name='fred', addresses=[
sess3.flush()
# assert modified/merged data was saved
- sess.clear()
+ sess.expunge_all()
u = sess.query(User).get(7)
eq_(u, User(id=7, name='fred2', addresses=[
Address(email_address='foo@bar.com'),
assert not sa.orm.object_mapper(a2)._is_orphan(
sa.orm.attributes.instance_state(a2))
sess2.flush()
- sess2.clear()
+ sess2.expunge_all()
eq_(sess2.query(User).get(u2.id).addresses[0].email_address,
'somenewaddress')
assert not sa.orm.object_mapper(a2)._is_orphan(
sa.orm.attributes.instance_state(a2))
sess2.flush()
- sess2.clear()
+ sess2.expunge_all()
eq_(sess2.query(User).get(u2.id).addresses[0].email_address,
'somenewaddress')
except sa.exc.InvalidRequestError, e:
sess = create_session()
u = User()
u.name = 'ed'
- sess.save(u)
+ sess.add(u)
sess.flush()
sess.expunge(u)
sess.merge(u)
assert sess.query(User).get('jack') is None
- sess.clear()
+ sess.expunge_all()
u1 = sess.query(User).get('ed')
self.assertEquals(User(username='ed', fullname='jack'), u1)
sess.expire(u1)
self.assertRaises(sa.orm.exc.ObjectDeletedError, getattr, u1, 'username')
- sess.clear()
+ sess.expunge_all()
assert sess.query(User).get('jack') is None
assert sess.query(User).get('ed').fullname == 'jack'
sess.expire(u1)
u1.username = 'ed'
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(User).get('ed').fullname == 'jack'
sess.flush()
assert u1.addresses[0].username == 'ed'
- sess.clear()
+ sess.expunge_all()
self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
u1 = sess.query(User).get('ed')
self.assert_sql_count(testing.db, go, 4) # test passive_updates=False; load addresses, update user, update 2 addresses
else:
self.assert_sql_count(testing.db, go, 1) # test passive_updates=True; update user
- sess.clear()
+ sess.expunge_all()
assert User(username='jack', addresses=[Address(username='jack'), Address(username='jack')]) == sess.query(User).get('jack')
u1 = sess.query(User).get('jack')
u1.addresses = []
u1.username = 'fred'
sess.flush()
- sess.clear()
+ sess.expunge_all()
assert sess.query(Address).get('jack1').username is None
u1 = sess.query(User).get('fred')
self.assertEquals(User(username='fred', fullname='jack'), u1)
self.assert_sql_count(testing.db, go, 0)
assert a1.username == a2.username == 'ed'
- sess.clear()
+ sess.expunge_all()
self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
@testing.fails_on('sqlite', 'FIXME: unknown')
else:
self.assert_sql_count(testing.db, go, 3)
self.assertEquals([Address(username='ed'), Address(username='ed')], [ad1, ad2])
- sess.clear()
+ sess.expunge_all()
self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
u1 = sess.query(User).get('ed')
self.assert_sql_count(testing.db, go, 1)
else:
self.assert_sql_count(testing.db, go, 3)
- sess.clear()
+ sess.expunge_all()
self.assertEquals([Address(username='fred'), Address(username='fred')], sess.query(Address).all())
sess.flush()
self.assert_sql_count(testing.db, go, 0)
- sess.clear()
+ sess.expunge_all()
r = sess.query(Item).all()
self.assertEquals(Item(itemname='item1'), r[0])
self.assertEquals(['jack'], [u.username for u in r[0].users])
self.assertEquals(Item(itemname='item2'), r[1])
self.assertEquals(['ed', 'jack'], sorted([u.username for u in r[1].users]))
- sess.clear()
+ sess.expunge_all()
u2 = sess.query(User).get(u2.username)
u2.username='wendy'
sess.flush()
assert u1.addresses[0].username == 'ed'
self.assertEquals(sa.select([addresses.c.username]).execute().fetchall(), [('ed',), ('ed',)])
- sess.clear()
+ sess.expunge_all()
self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
u1 = sess.query(User).get(u1.id)
self.assert_sql_count(testing.db, go, 4) # test passive_updates=False; load addresses, update user, update 2 addresses
else:
self.assert_sql_count(testing.db, go, 1) # test passive_updates=True; update user
- sess.clear()
+ sess.expunge_all()
assert User(username='jack', addresses=[Address(username='jack'), Address(username='jack')]) == sess.query(User).get(u1.id)
- sess.clear()
+ sess.expunge_all()
u1 = sess.query(User).get(u1.id)
u1.addresses = []
u1.username = 'fred'
sess.flush()
- sess.clear()
+ sess.expunge_all()
a1 = sess.query(Address).get(a1.id)
self.assertEquals(a1.username, None)
p.jack = None
assert j.port is None
- ctx.clear()
+ ctx.expunge_all()
j = ctx.query(Jack).get(jid)
p = ctx.query(Port).get(pid)
sess.add(u2)
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(u1, sess.query(User).get(u2.id))
u1.addresses.append(Address(email_address='ed@bar.com'))
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
u1 = sess.query(User).get(u1.id)
assert 'name' not in u1.__dict__
assert 'addresses' not in u1.__dict__
u2 = pickle.loads(pickle.dumps(u1))
sess2 = create_session()
- sess2.update(u2)
+ sess2.add(u2)
self.assertEquals(u2.name, 'ed')
self.assertEquals(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
u1.addresses.append(Address(email_address='ed@bar.com'))
sess.add(u1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
u1 = sess.query(User).options(sa.orm.defer('name'), sa.orm.defer('addresses.email_address')).get(u1.id)
assert 'name' not in u1.__dict__
u2 = pickle.loads(pickle.dumps(u1))
sess2 = create_session()
- sess2.update(u2)
+ sess2.add(u2)
self.assertEquals(u2.name, 'ed')
assert 'addresses' not in u2.__dict__
ad = u2.addresses[0]
sess = create_session()
sess.add(eu)
sess.flush()
- sess.clear()
+ sess.expunge_all()
eu = sess.query(User).first()
eu2 = pickle.loads(pickle.dumps(eu))
sess2 = create_session()
- sess2.update(eu2)
+ sess2.add(eu2)
assert 'email_address' not in eu2.__dict__
self.assertEquals(eu2.email_address, 'foo@bar.com')
u = s.query(User).get(7)
u2 = s.query(User).get(7)
assert u is u2
- s.clear()
+ s.expunge_all()
u2 = s.query(User).get(7)
assert u is not u2
u = s.query(User).populate_existing().get(7)
u2 = s.query(User).populate_existing().get(7)
assert u is u2
- s.clear()
+ s.expunge_all()
u2 = s.query(User).populate_existing().get(7)
assert u is not u2
self.assertEquals([User(id=7),User(id=8),User(id=9)], sess.query(User).filter(User.addresses!=None).order_by(User.id).all())
-class FromSelfTest(QueryTest):
+class FromSelfTest(QueryTest, AssertsCompiledSQL):
def test_filter(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().all()
(User(id=8), Address(id=3)),
(User(id=8), Address(id=4)),
(User(id=9), Address(id=5))
- ] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
+ ] == create_session().query(User).filter(User.id.in_([8,9]))._from_self().\
+ join('addresses').add_entity(Address).order_by(User.id, Address.id).all()
+ def test_no_eagerload(self):
+ """test that eagerloads are pushed outwards and not rendered in subqueries."""
+
+ s = create_session()
+
+ self.assert_compile(
+ s.query(User).options(eagerload(User.addresses)).from_self().statement,
+ "SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, addresses_1.user_id, "\
+ "addresses_1.email_address FROM (SELECT users.id AS users_id, users.name AS users_name FROM users) AS anon_1 "\
+ "LEFT OUTER JOIN addresses AS addresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id"
+ )
+
+
def test_multiple_entities(self):
sess = create_session()
# order_by(User.id, Address.id).first(),
(User(id=8, addresses=[Address(), Address(), Address()]), Address(id=2)),
)
+
+class SetOpsTest(QueryTest, AssertsCompiledSQL):
+
+ def test_union(self):
+ s = create_session()
+
+ fred = s.query(User).filter(User.name=='fred')
+ ed = s.query(User).filter(User.name=='ed')
+ jack = s.query(User).filter(User.name=='jack')
+
+ self.assertEquals(fred.union(ed).order_by(User.name).all(),
+ [User(name='ed'), User(name='fred')]
+ )
+
+ self.assertEquals(fred.union(ed, jack).order_by(User.name).all(),
+ [User(name='ed'), User(name='fred'), User(name='jack')]
+ )
+
+ @testing.fails_on('mysql', "mysql doesn't support intersect")
+ def test_intersect(self):
+ s = create_session()
+
+ fred = s.query(User).filter(User.name=='fred')
+ ed = s.query(User).filter(User.name=='ed')
+ jack = s.query(User).filter(User.name=='jack')
+ self.assertEquals(fred.intersect(ed, jack).all(),
+ []
+ )
+
+ self.assertEquals(fred.union(ed).intersect(ed.union(jack)).all(),
+ [User(name='ed')]
+ )
+
+ def test_eager_load(self):
+ s = create_session()
+
+ fred = s.query(User).filter(User.name=='fred')
+ ed = s.query(User).filter(User.name=='ed')
+ jack = s.query(User).filter(User.name=='jack')
+
+ def go():
+ self.assertEquals(
+ fred.union(ed).order_by(User.name).options(eagerload(User.addresses)).all(),
+ [
+ User(name='ed', addresses=[Address(), Address(), Address()]),
+ User(name='fred', addresses=[Address()])
+ ]
+ )
+ self.assert_sql_count(testing.db, go, 1)
+
class AggregateTest(QueryTest):
assert [User(id=7), User(id=9), User(id=8)] == q.all()
- sess.clear()
+ sess.expunge_all()
# test that it works on embedded eagerload/LIMIT subquery
q = sess.query(User).join('addresses').distinct().options(eagerload('addresses')).order_by(desc(Address.email_address)).limit(2)
l = q.order_by(User.id, AdAlias.id).all()
self.assertEquals(l, expected)
- sess.clear()
+ sess.expunge_all()
q = sess.query(User).add_entity(AdAlias)
l = q.select_from(outerjoin(User, AdAlias)).filter(AdAlias.email_address=='ed@bettyboop.com').all()
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
l = q.options(contains_alias('ulist'), contains_eager('addresses')).from_statement(query).all()
def go():
assert fixtures.user_address_result == q.all()
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
adalias = addresses.alias()
q = sess.query(User).select_from(users.outerjoin(adalias)).options(contains_eager(User.addresses, alias=adalias))
def go():
self.assertEquals(fixtures.user_address_result, q.order_by(User.id).all())
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
selectquery = users.outerjoin(addresses).select(users.c.id<10, use_labels=True, order_by=[users.c.id, addresses.c.id])
q = sess.query(User)
assert fixtures.user_address_result[0:3] == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
l = list(q.options(contains_eager(User.addresses)).instances(selectquery.execute()))
assert fixtures.user_address_result[0:3] == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
l = q.options(contains_eager('addresses')).from_statement(selectquery).all()
l = list(q.options(contains_eager('addresses', alias="adalias")).instances(selectquery.execute()))
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
# expression.Alias object
def go():
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
# Aliased object
adalias = aliased(Address)
l = q.options(contains_eager('addresses', alias=adalias)).outerjoin((adalias, User.addresses)).order_by(User.id, adalias.id)
assert fixtures.user_address_result == l.all()
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
oalias = orders.alias('o1')
ialias = items.alias('i1')
assert fixtures.user_order_result == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
# test using Alias with more than one level deep
def go():
l = list(q.options(contains_eager('orders', alias=oalias), contains_eager('orders.items', alias=ialias)).instances(query.execute()))
assert fixtures.user_order_result == l
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
# test using Aliased with more than one level deep
oalias = aliased(Order)
outerjoin((oalias, User.orders), (ialias, oalias.items)).order_by(User.id, oalias.id, ialias.id)
assert fixtures.user_order_result == l.all()
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def test_mixed_eager_contains_with_limit(self):
sess = create_session()
Order(address_id=None,user_id=7,description=u'order 5',isopen=0,id=5)
])])
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
# same as above, except Order is aliased, so two adapters are applied by the
selectquery = users.outerjoin(addresses).select(use_labels=True, order_by=[users.c.id, addresses.c.id])
self.assertEquals(list(sess.query(User, Address).instances(selectquery.execute())), expected)
- sess.clear()
+ sess.expunge_all()
for address_entity in (Address, aliased(Address)):
q = sess.query(User).add_entity(address_entity).outerjoin(('addresses', address_entity)).order_by(User.id, address_entity.id)
self.assertEquals(q.all(), expected)
- sess.clear()
+ sess.expunge_all()
q = sess.query(User).add_entity(address_entity)
q = q.join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
self.assertEquals(q.all(), [(user8, address3)])
- sess.clear()
+ sess.expunge_all()
q = sess.query(User, address_entity).join(('addresses', address_entity)).filter_by(email_address='ed@bettyboop.com')
self.assertEquals(q.all(), [(user8, address3)])
- sess.clear()
+ sess.expunge_all()
q = sess.query(User, address_entity).join(('addresses', address_entity)).options(eagerload('addresses')).filter_by(email_address='ed@bettyboop.com')
self.assertEquals(list(util.OrderedSet(q.all())), [(user8, address3)])
- sess.clear()
+ sess.expunge_all()
def test_aliased_multi_mappers(self):
sess = create_session()
l = q.order_by(User.id, adalias.c.id).all()
assert l == expected
- sess.clear()
+ sess.expunge_all()
q = sess.query(User).add_entity(Address, alias=adalias)
l = q.select_from(users.outerjoin(adalias)).filter(adalias.c.email_address=='ed@bettyboop.com').all()
for add_col in (User.name, users.c.name):
assert sess.query(User).add_column(add_col).all() == expected
- sess.clear()
+ sess.expunge_all()
self.assertRaises(sa_exc.InvalidRequestError, sess.query(User).add_column, object())
q = sess.query(User)
q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin('addresses').add_column(func.count(Address.id).label('count'))
self.assertEquals(q.all(), expected)
- sess.clear()
+ sess.expunge_all()
adalias = aliased(Address)
q = sess.query(User)
q = q.group_by([c for c in users.c]).order_by(User.id).outerjoin(('addresses', adalias)).add_column(func.count(adalias.id).label('count'))
self.assertEquals(q.all(), expected)
- sess.clear()
+ sess.expunge_all()
s = select([users, func.count(addresses.c.id).label('count')]).select_from(users.outerjoin(addresses)).group_by(*[c for c in users.c]).order_by(User.id)
q = sess.query(User)
l = q.add_column("count").add_column("concat").from_statement(s).all()
assert l == expected
- sess.clear()
+ sess.expunge_all()
# test with select_from()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.group_by([c for c in users.c]).order_by(users.c.id)
assert q.all() == expected
- sess.clear()
+ sess.expunge_all()
q = create_session().query(User).add_column(func.count(addresses.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin('addresses')\
.group_by([c for c in users.c]).order_by(users.c.id)
assert q.all() == expected
- sess.clear()
+ sess.expunge_all()
q = create_session().query(User).add_column(func.count(adalias.c.id))\
.add_column(("Name:" + users.c.name)).outerjoin(('addresses', adalias))\
.group_by([c for c in users.c]).order_by(users.c.id)
assert q.all() == expected
- sess.clear()
+ sess.expunge_all()
class ImmediateTest(_fixtures.FixtureTest):
])
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
sel2 = orders.select(orders.c.id.in_([1,2,3]))
self.assertEquals(sess.query(Order).select_from(sel2).join(['items', 'keywords']).filter(Keyword.name == 'red').order_by(Order.id).all(), [
Order(description=u'order 1',id=1),
]
)
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).filter(User.id==8).order_by(User.id).all(),
[User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)])]
)
self.assert_sql_count(testing.db, go, 1)
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(User).options(eagerload('addresses')).select_from(sel).order_by(User.id)[1], User(id=8, addresses=[Address(id=2), Address(id=3), Address(id=4)]))
n1.children[1].append(Node(data='n121'))
n1.children[1].append(Node(data='n122'))
n1.children[1].append(Node(data='n123'))
- sess.save(n1)
+ sess.add(n1)
sess.flush()
sess.close()
n2.children = [n3, n6, n7]
n3.children = [n5, n4]
- sess.save(n1)
- sess.save(n2)
- sess.save(n3)
- sess.save(n4)
+ sess.add(n1)
+ sess.add(n2)
+ sess.add(n3)
+ sess.add(n4)
sess.flush()
sess.close()
# run the eager version twice to test caching of aliased clauses
for x in range(2):
- sess.clear()
+ sess.expunge_all()
def go():
self.assertEquals(sess.query(Address).options(eagerload('user')).all(), address_result)
self.assert_sql_count(testing.db, go, 1)
sess.add_all((c1, c2))
sess.flush()
- sess.clear()
+ sess.expunge_all()
test_c1 = sess.query(Company).get(c1.company_id)
test_e1 = sess.query(Employee).get([c1.company_id, e1.emp_id])
sess.add_all((c1, c2))
sess.flush()
- sess.clear()
+ sess.expunge_all()
test_c1 = sess.query(Company).get(c1.company_id)
test_e1 = sess.query(Employee).get([c1.company_id, e1.emp_id])
s.flush()
- s.clear()
+ s.expunge_all()
j = s.query(Job).filter_by(jobno=u'somejob').one()
oldp = list(j.pages)
j.pages = []
s.flush()
- s.clear()
+ s.expunge_all()
j = s.query(Job).filter_by(jobno=u'somejob2').one()
j.pages[1].current_version = 12
s.delete(j)
sess.flush()
assert a1 not in sess
assert b1 not in sess
- sess.clear()
+ sess.expunge_all()
sa.orm.clear_mappers()
@testing.resolve_artifact_names
sess.flush()
assert a1 not in sess
assert b1 not in sess
- sess.clear()
+ sess.expunge_all()
sa.orm.clear_mappers()
@testing.resolve_artifact_names
sess.flush()
assert a1 not in sess
assert b1 not in sess
- sess.clear()
+ sess.expunge_all()
@testing.resolve_artifact_names
def test_delete_manual_BtoA(self):
con.lineItems.append(li)
session.add(li)
session.flush()
- session.clear()
+ session.expunge_all()
newcon = session.query(Container).first()
assert con.policyNum == newcon.policyNum
assert len(newcon.lineItems) == 10
t1.foo.append(TagInstance(data='not_iplc_case'))
sess.add(t1)
sess.flush()
- sess.clear()
+ sess.expunge_all()
# relation works
eq_(sess.query(Tag).all(), [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])])
sess.add(c1)
sess.add(c3)
sess.flush()
- sess.clear()
+ sess.expunge_all()
c1 = sess.query(C1).get(c1.id)
assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
sess.add_all((c1, c3))
sess.flush()
- sess.clear()
+ sess.expunge_all()
c1 = sess.query(C1).get(c1.t1id)
assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
sess.add_all((f1, f2))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(Foo).filter_by(id=f1.id).one(),
Foo(bars=[Bar(data='b1'), Bar(data='b2')]))
eq_(sess.query(Foo).filter_by(id=f2.id).one(),
sess.add_all((b1, b2, b3, b4))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(Foo).filter_by(id=f1.id).one(),
Foo(bars=[Bar(data='b1'), Bar(data='b2'), Bar(data='b4')]))
eq_(sess.query(Foo).filter_by(id=f2.id).one(),
sess = create_session()
sess.add(T2(data='t2', t1=T1(data='t1'), t3s=[T3(data='t3')]))
sess.flush()
- sess.clear()
+ sess.expunge_all()
a = sess.query(T1).first()
eq_(a.t3s, [T3(data='t3')])
b3 = T2(data='b3', t1id='Number2')
sess.add_all((a1, a2, b1, b2, b3))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T1).first(),
T1(id='number1', data='a1', t2s=[
b3 = T2(data='b3', t1id='Number2')
sess.add_all((a1, a2, b1, b2, b3))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T2).filter(T2.data.in_(['b1', 'b2'])).all(),
[T2(data='b1', t1=[T1(id='number1', data='a1')]),
b3 = T2(data='b2', t1id='number2')
sess.add_all((a1, a2, b1, b2, b3))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T1).first(),
T1(id='NuMbeR1', data='a1', t2s=[
b3 = T2(data='b3', t1id='number2')
sess.add_all((a1, a2, b1, b2, b3))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T2).filter(T2.data.in_(['b1', 'b2'])).all(),
[T2(data='b1', t1=[T1(id='NuMbeR1', data='a1')]),
sso = SomeOtherObject()
s.options.append(sso)
Session.flush()
- Session.clear()
+ Session.expunge_all()
@testing.resolve_artifact_names
def test_query(self):
b = BaseClass(data='b1')
s = SubClass(data='s1', somedata='somedata')
Session.commit()
- Session.clear()
+ Session.expunge_all()
eq_(expunge_list([BaseClass(data='b1'),
SubClass(data='s1', somedata='somedata')]),
sess = create_session(bind=testing.db)
sess.add(Subset(data=1))
sess.flush()
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(Subset).all(), [Subset(data=1)])
eq_(sess.query(Subset).filter(Subset.data==1).one(), Subset(data=1))
assert c.scalar("select count(1) from users") == 1
+ @testing.uses_deprecated()
@engines.close_open_connections
@testing.resolve_artifact_names
def test_save_update_delete(self):
# modify outside of session, assert changes remain/get saved
user.name = "fred"
- s.update(user)
+ s.add(user)
assert user in s
assert user in s.dirty
s.flush()
- s.clear()
+ s.expunge_all()
assert s.query(User).count() == 1
user = s.query(User).one()
assert user.name == 'fred'
# ensure its not dirty if no changes occur
- s.clear()
+ s.expunge_all()
assert user not in s
- s.update(user)
+ s.add(user)
assert user in s
assert user not in s.dirty
u = User(name='fred')
s.add(u)
s.flush()
- s.clear()
+ s.expunge_all()
user = s.query(User).one()
assert user not in s.dirty
assert a not in s
s.flush()
print "\n".join([repr(x.__dict__) for x in s])
- s.clear()
+ s.expunge_all()
assert s.query(User).one().id == u.id
assert s.query(Address).first() is None
assert u not in s
assert a in s
s.flush()
- s.clear()
+ s.expunge_all()
assert s.query(Address).one().id == a.id
assert s.query(User).first() is None
sess.add(User(name='u2'))
sess.flush()
- sess.clear()
+ sess.expunge_all()
self.assertEquals(sess.query(User).order_by(User.name).all(),
[
User(name='u1 modified'),
assert u2 is not None and u2 is not u1
assert u2 in sess
- self.assertRaises(Exception, lambda: sess.update(u1))
+ self.assertRaises(Exception, lambda: sess.add(u1))
sess.expunge(u2)
assert u2 not in sess
u1.name = "John"
u2.name = "Doe"
- sess.update(u1)
+ sess.add(u1)
assert u1 in sess
assert Session.object_session(u1) is sess
sess.flush()
- sess.clear()
+ sess.expunge_all()
u3 = sess.query(User).get(u1.id)
assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
self._test_session().flush()
def test_clear(self):
- self._test_session().clear()
+ self._test_session().expunge_all()
def test_close(self):
self._test_session().close()
return mapper(cls, Table('t', sa.MetaData(),
Column('id', Integer, primary_key=True)))
+ @testing.uses_deprecated()
def _test_instance_guards(self, user_arg):
watchdog = set()
sess = create_session()
for c in [tokyo, newyork, toronto, london, dublin, brasilia, quito]:
- sess.save(c)
+ sess.add(c)
sess.commit()
tokyo.city # reload 'city' attribute on tokyo
- sess.clear()
+ sess.expunge_all()
eq_(db2.execute(weather_locations.select()).fetchall(), [(1, 'Asia', 'Tokyo')])
eq_(db1.execute(weather_locations.select()).fetchall(), [(2, 'North America', 'New York'), (3, 'North America', 'Toronto')])
assert u2.name == 'jack'
self.assertEquals(s.query(User.name).order_by(User.id).all(), [('ed',), ('jack',)])
+ @testing.requires.savepoints
+ def test_savepoint_delete(self):
+ s = self.session()
+ u1 = User(name='ed')
+ s.add(u1)
+ s.commit()
+ self.assertEquals(s.query(User).filter_by(name='ed').count(), 1)
+ s.begin_nested()
+ s.delete(u1)
+ s.commit()
+ self.assertEquals(s.query(User).filter_by(name='ed').count(), 0)
+ s.commit()
+
@testing.requires.savepoints
def test_savepoint_commit(self):
s = self.session()
self.assert_(u.addresses == [a])
session.commit()
- session.clear()
+ session.expunge_all()
u = session.query(m).one()
assert u.addresses[0].user == u
session = create_session()
session.add(a1)
session.flush()
- session.clear()
+ session.expunge_all()
new_a1 = session.query(A).filter(t1.c.a == a1.a).one()
assert new_a1.a == a1.a
assert new_a1.t2s[0].d == b1.d
- session.clear()
+ session.expunge_all()
new_a1 = (session.query(A).options(sa.orm.eagerload('t2s')).
filter(t1.c.a == a1.a)).one()
assert new_a1.a == a1.a
assert new_a1.t2s[0].d == b1.d
- session.clear()
+ session.expunge_all()
new_a1 = session.query(A).filter(A.a == a1.a).one()
assert new_a1.a == a1.a
assert new_a1.t2s[0].d == b1.d
- session.clear()
+ session.expunge_all()
@testing.fails_on('mssql', 'pyodbc returns a non unicode encoding of the results description.')
@testing.resolve_artifact_names
session = create_session()
session.add_all((a1, b1))
session.flush()
- session.clear()
+ session.expunge_all()
eq_([A(b=5), B(e=7)], session.query(A).all())
session = create_session()
session.add(f1)
session.flush()
- session.clear()
+ session.expunge_all()
f2 = session.query(Foo).filter_by(id=f1.id).one()
assert 'data' in sa.orm.attributes.instance_state(f2).unmodified
assert f2 in session.dirty
assert 'data' not in sa.orm.attributes.instance_state(f2).unmodified
session.flush()
- session.clear()
+ session.expunge_all()
f3 = session.query(Foo).filter_by(id=f1.id).one()
ne_(f3.data,f1.data)
session = create_session(autocommit=False)
session.add(f1)
session.commit()
- session.clear()
+ session.expunge_all()
f1 = session.query(Foo).get(f1.id)
f1.val = u'hi'
f1.data[0]['personne']['VenSoir']= False
self.sql_count_(1, session.commit)
- session.clear()
+ session.expunge_all()
f = session.query(Foo).get(f1.id)
eq_(f.data,
[ {
session = create_session()
session.add(e)
session.flush()
- session.clear()
+ session.expunge_all()
e2 = session.query(Entry).get((e.multi_id, 2))
self.assert_(e is not e2)
assert (u.counter == 2) is True
self.sql_count_(1, go)
- session.clear()
+ session.expunge_all()
u = session.query(User).get(u.id)
eq_(u.name, 'test2')
eq_(u.counter, 2)
session.add(mc)
session.flush()
- session.clear()
+ session.expunge_all()
assert myothertable.count().scalar() == 4
mc = session.query(MyClass).get(mc.id)
mc.children.append(MyOtherClass())
session.add(mc)
session.flush()
- session.clear()
+ session.expunge_all()
assert myothertable.count().scalar() == 4
mc = session.query(MyClass).get(mc.id)
mc.children.append(MyOtherClass())
session.add(mc)
session.flush()
- session.clear()
+ session.expunge_all()
assert myothertable.count().scalar() == 1
self.assert_(h5.foober == 'im the new foober')
self.sql_count_(0, go)
- session.clear()
+ session.expunge_all()
(h1, h2, h3, h4, h5) = session.query(Hoho).order_by(Hoho.id).all()
session = create_session()
session.add(h1)
session.flush()
- session.clear()
+ session.expunge_all()
eq_(session.query(Hoho).get(h1.id),
Hoho(hoho=hohoval,
h1 = session.query(Hoho).get(h1.id)
h1.secondaries.append(Secondary(data='s2'))
session.flush()
- session.clear()
+ session.expunge_all()
eq_(session.query(Hoho).get(h1.id),
Hoho(hoho=hohoval,
session.delete(u1)
session.flush()
- session.clear()
+ session.expunge_all()
u2 = session.query(User).get(u2.id)
eq_(len(u2.addresses), 1)
u2.addresses.append(a)
session.flush()
- session.clear()
+ session.expunge_all()
u2 = session.query(User).get(u2.id)
eq_(len(u2.addresses), 1)
assert u is nu
# clear out the identity map, so next get forces a SELECT
- session.clear()
+ session.expunge_all()
# check it again, identity should be different but ids the same
nu = session.query(m).get(u.id)
# change first users name and save
session = create_session()
- session.update(u)
+ session.add(u)
u.name = 'modifiedname'
assert u in session.dirty
session.flush()
session = create_session()
session.add(u)
session.flush()
- session.clear()
+ session.expunge_all()
u = session.query(SUser).first()
eq_(u.syn_name, 'User:some name:User')
session = create_session()
session.add(u)
session.flush()
- session.clear()
+ session.expunge_all()
u = session.query(User).one()
u.name = 'newname'
session = create_session()
session.add(au)
session.flush()
- session.clear()
+ session.expunge_all()
rt = session.query(AddressUser).one()
eq_(au.user_id, rt.user_id)
eq_(list(session.execute(orders.select(), mapper=Order)),
[(42, None, None, 'foo', None)])
- session.clear()
+ session.expunge_all()
# assert that a set operation doesn't trigger a load operation
o = session.query(Order).filter(Order.description == 'foo').one()
eq_(list(session.execute(orders.select(), mapper=Order)),
[(42, None, None, 'hoho', None)])
- session.clear()
+ session.expunge_all()
# test assigning None to an unloaded deferred also works
o = session.query(Order).filter(Order.description == 'hoho').one()
session = create_session()
session.add(u)
session.flush()
- session.clear()
+ session.expunge_all()
u = session.query(User).get(u.id)
u.name = ''
session = create_session()
session.add(u)
session.flush()
- session.clear()
+ session.expunge_all()
id = m.primary_key_from_instance(u)
address_rows = addresses.select(addresses.c.id.in_([u.id])).execute().fetchall()
eq_(address_rows[0].values(), [u.id, u.foo_id, 'lala@hey.com'])
- session.clear()
+ session.expunge_all()
u = session.query(User).get(id)
assert u.name == 'imnew'
session = create_session()
session.add(u)
session.flush()
- session.clear()
+ session.expunge_all()
u = session.query(User).get(u.id)
session.delete(u)
# test insert ordering is maintained
assert names == ['user1', 'user2', 'user4', 'user5', 'user3']
- session.clear()
+ session.expunge_all()
sa.orm.clear_mappers()
session = create_session()
session.add(a1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
u1 = session.query(User).get(u1.id)
a1.user = None
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
u1 = session.query(User).get(u1.id)
assert a1.user is None
session = create_session()
session.add_all((a1, a2))
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
a2 = session.query(Address).get(a2.id)
a1.user = None
a2.user = u1
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
a2 = session.query(Address).get(a2.id)
session = create_session()
session.add_all((a1, u1, u2))
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
u1 = session.query(User).get(u1.id)
a1.user = u2
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
u1 = session.query(User).get(u1.id)
u2 = session.query(User).get(u2.id)
session = create_session()
session.add(u1)
session.flush()
- session.clear()
+ session.expunge_all()
a1 = session.query(Address).get(a1.id)
a1.user = None
session.flush()
- session.clear()
+ session.expunge_all()
assert session.query(Address).get(a1.id).user is None
assert session.query(User).get(u1.id).addresses == []
item.keywords.append(k2)
session.flush()
- session.clear()
+ session.expunge_all()
item = session.query(Item).get(item.id)
assert item.keywords == [k1, k2]
for clear in (False, True):
if clear:
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T).all(), [T(value=True, name="t1"), T(value=False, name="t2"), T(value=True, name="t3")])
if clear:
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")])
if clear:
- sess.clear()
+ sess.expunge_all()
eq_(sess.query(T).filter(T.value==False).all(), [T(value=False, name="t2")])
t2 = sess.query(T).get(t2.id)
@testing.resolve_artifact_names
def test_manytomany(self):
mapper(T5, t5, properties={
- 't7s':relation(T7, secondary=t5t7, cascade="all, delete-orphan")
+ 't7s':relation(T7, secondary=t5t7, cascade="all")
})
mapper(T7, t7)
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
- sess.clear()
+ sess.expunge_all()
alist = sess.query(A).all()
self.assertEquals(
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
- sess.clear()
+ sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
self.assertEquals(
for x in [a1,a2,b1, b2]:
sess.add(x)
sess.flush()
- sess.clear()
+ sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
self.assertEquals(
for x in [a1,a2]:
sess.add(x)
sess.flush()
- sess.clear()
+ sess.expunge_all()
alist = sess.query(A).order_by(A.col1).all()
self.assertEquals(
metadata.drop_all()
assert_no_mappers()
+ def test_join_cache(self):
+ metadata = MetaData(testing.db)
+
+ table1 = Table("table1", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30))
+ )
+
+ table2 = Table("table2", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)),
+ Column('t1id', Integer, ForeignKey('table1.id'))
+ )
+
+ class Foo(object):
+ pass
+
+ class Bar(object):
+ pass
+
+ mapper(Foo, table1, properties={
+ 'bars':relation(mapper(Bar, table2))
+ })
+ metadata.create_all()
+
+ session = sessionmaker()
+
+ @profile_memory
+ def go():
+ s = table2.select()
+ sess = session()
+ sess.query(Foo).join((s, Foo.bars)).all()
+ sess.rollback()
+ try:
+ go()
+ finally:
+ metadata.drop_all()
+
+
def test_mutable_identity(self):
metadata = MetaData(testing.db)
LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7),
Admission=4.95,
)
- session.save(wap)
+ session.add(wap)
sdz = Zoo(Name =u'San Diego Zoo',
# This early date should play havoc with a number
# of implementations.
Opens = datetime.time(9, 0, 0),
Admission = 0,
)
- session.save(sdz)
+ session.add(sdz)
bio = Zoo(
Name = u'Montr\xe9al Biod\xf4me',
Opens = datetime.time(9, 0, 0),
Admission = 11.75,
)
- session.save(bio)
+ session.add(bio)
seaworld = Zoo(
Name =u'Sea_World', Admission = 60)
- session.save(seaworld)
+ session.add(seaworld)
# Let's add a crazy futuristic Zoo to test large date values.
lp = Zoo(Name =u'Luna Park',
Opens = datetime.time(0, 0, 0),
Admission = 134.95,
)
- session.save(lp)
+ session.add(lp)
session.flush()
# Animals
leopard = Animal(Species=u'Leopard', Lifespan=73.5,)
- session.save(leopard)
+ session.add(leopard)
leopard.ZooID = wap.ID
leopard.LastEscape = datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)
- session.save(Animal(Species=u'Lion', ZooID=wap.ID))
- session.save(Animal(Species=u'Slug', Legs=1, Lifespan=.75))
- session.save(Animal(Species=u'Tiger', ZooID=sdz.ID))
+ session.add(Animal(Species=u'Lion', ZooID=wap.ID))
+ session.add(Animal(Species=u'Slug', Legs=1, Lifespan=.75))
+ session.add(Animal(Species=u'Tiger', ZooID=sdz.ID))
# Override Legs.default with itself just to make sure it works.
- session.save(Animal(Species=u'Bear', Legs=4))
- session.save(Animal(Species=u'Ostrich', Legs=2, Lifespan=103.2))
- session.save(Animal(Species=u'Centipede', Legs=100))
+ session.add(Animal(Species=u'Bear', Legs=4))
+ session.add(Animal(Species=u'Ostrich', Legs=2, Lifespan=103.2))
+ session.add(Animal(Species=u'Centipede', Legs=100))
- session.save(Animal(Species=u'Emperor Penguin', Legs=2, ZooID=seaworld.ID))
- session.save(Animal(Species=u'Adelie Penguin', Legs=2, ZooID=seaworld.ID))
+ session.add(Animal(Species=u'Emperor Penguin', Legs=2, ZooID=seaworld.ID))
+ session.add(Animal(Species=u'Adelie Penguin', Legs=2, ZooID=seaworld.ID))
- session.save(Animal(Species=u'Millipede', Legs=1000000, ZooID=sdz.ID))
+ session.add(Animal(Species=u'Millipede', Legs=1000000, ZooID=sdz.ID))
# Add a mother and child to test relationships
bai_yun = Animal(Species=u'Ape', Nameu=u'Bai Yun', Legs=2)
- session.save(bai_yun)
- session.save(Animal(Species=u'Ape', Name=u'Hua Mei', Legs=2,
+ session.add(bai_yun)
+ session.add(Animal(Species=u'Ape', Name=u'Hua Mei', Legs=2,
MotherID=bai_yun.ID))
session.flush()
session.commit()
def test_baseline_2_insert(self):
for x in xrange(ITERATIONS):
- session.save(Animal(Species=u'Tick', Name=u'Tick %d' % x, Legs=8))
+ session.add(Animal(Species=u'Tick', Name=u'Tick %d' % x, Legs=8))
session.flush()
def test_baseline_3_properties(self):
assert str(u) == str(u2) == str(u3)
assert u2.compile().params == {'id_param':7}
assert u3.compile().params == {'id_param':10}
+
+ def test_adapt_union(self):
+ u = union(t1.select().where(t1.c.col1==4), t1.select().where(t1.c.col1==5)).alias()
+
+ assert sql_util.ClauseAdapter(u).traverse(t1) is u
def test_binds(self):
"""test that unique bindparams change their name upon clone() to prevent conflicts"""
Column("spaces % more spaces", Integer),
)
metadata.create_all()
-
+
+ @testing.crashes('mysql', 'mysqldb calls name % (params)')
+ @testing.crashes('postgres', 'postgres calls name % (params)')
def tearDownAll(self):
metadata.drop_all()
{'percent%':9, '%(oneofthese)s':9, 'spaces % more spaces':10},
{'percent%':11, '%(oneofthese)s':10, 'spaces % more spaces':9},
)
- eq_(
- percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(),
- [
- (5, 7, 12),
- (7, 8, 11),
- (9, 9, 10),
- (11, 10, 9)
- ]
- )
- result = percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute()
- row = result.fetchone()
- eq_(row[percent_table.c['percent%']], 5)
- eq_(row[percent_table.c['%(oneofthese)s']], 7)
- eq_(row[percent_table.c['spaces % more spaces']], 12)
- row = result.fetchone()
- eq_(row['percent%'], 7)
- eq_(row['%(oneofthese)s'], 8)
- eq_(row['spaces % more spaces'], 11)
- result.close()
+
+ for table in (percent_table, percent_table.alias()):
+ eq_(
+ table.select().order_by(table.c['%(oneofthese)s']).execute().fetchall(),
+ [
+ (5, 7, 12),
+ (7, 8, 11),
+ (9, 9, 10),
+ (11, 10, 9)
+ ]
+ )
+
+ eq_(
+ table.select().
+ where(table.c['spaces % more spaces'].in_([9, 10])).
+ order_by(table.c['%(oneofthese)s']).execute().fetchall(),
+ [
+ (9, 9, 10),
+ (11, 10, 9)
+ ]
+ )
+
+ result = table.select().order_by(table.c['%(oneofthese)s']).execute()
+ row = result.fetchone()
+ eq_(row[table.c['percent%']], 5)
+ eq_(row[table.c['%(oneofthese)s']], 7)
+ eq_(row[table.c['spaces % more spaces']], 12)
+ row = result.fetchone()
+ eq_(row['percent%'], 7)
+ eq_(row['%(oneofthese)s'], 8)
+ eq_(row['spaces % more spaces'], 11)
+ result.close()
+
percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute()
+
eq_(
percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(),
[
user = User('zbloguser', "Zblog User", "hello", group=administrator)
blog = Blog(owner=user)
blog.name = "this is a blog"
- s.save(user)
- s.save(blog)
+ s.add(user)
+ s.add(blog)
s.flush()
blog_id = blog.id
user_id = user.id
try:
blog = s.query(Blog).get(blog_id)
post = Post(headline="asdf asdf", summary="asdfasfd")
- s.save(post)
+ s.add(post)
post.blog_id=blog_id
post.blog = blog
assert post in blog.posts
post.blog = blog
user = s.query(User).get(user_id)
post.user = user
- s.save(post)
+ s.add(post)
s.flush()
- s.clear()
+ s.expunge_all()
user = s.query(User).get(user_id)
blog = s.query(Blog).get(blog_id)
comment.post = post
comment.user = user
s.flush()
- s.clear()
+ s.expunge_all()
assert s.query(Post).get(post.id) is not None