self.add(value)
def __ior__(self, other):
- if util.duck_type_collection(other) is not set:
+ if util.duck_type_collection(other) is not util.Set:
return NotImplemented
for value in other:
self.add(value)
self.discard(value)
def __isub__(self, other):
- if util.duck_type_collection(other) is not set:
+ if util.duck_type_collection(other) is not util.Set:
return NotImplemented
for value in other:
self.discard(value)
self.add(value)
def __iand__(self, other):
- if util.duck_type_collection(other) is not set:
+ if util.duck_type_collection(other) is not util.Set:
return NotImplemented
want, have = self.intersection(other), util.Set(self)
self.add(value)
def __ixor__(self, other):
- if util.duck_type_collection(other) is not set:
+ if util.duck_type_collection(other) is not util.Set:
return NotImplemented
want, have = self.symmetric_difference(other), util.Set(self)
import copy, inspect, sys, weakref
from sqlalchemy import exceptions, schema, util as sautil
-from sqlalchemy.util import attrgetter
+from sqlalchemy.util import attrgetter, Set
__all__ = ['collection', 'collection_adapter',
def _tidy(fn):
setattr(fn, '_sa_instrumented', True)
- fn.__doc__ = getattr(getattr(set, fn.__name__), '__doc__')
+ fn.__doc__ = getattr(getattr(Set, fn.__name__), '__doc__')
Unspecified=object()
_tidy(add)
return add
- def discard(fn):
- def discard(self, value, _sa_initiator=None):
- # testlib.pragma exempt:__hash__
- if value in self:
- __del(self, value, _sa_initiator)
- # testlib.pragma exempt:__hash__
- fn(self, value)
- _tidy(discard)
- return discard
+ if sys.version_info < (2, 4):
+ def discard(fn):
+ def discard(self, value, _sa_initiator=None):
+ if value in self:
+ self.remove(value, _sa_initiator)
+ _tidy(discard)
+ return discard
+ else:
+ def discard(fn):
+ def discard(self, value, _sa_initiator=None):
+ # testlib.pragma exempt:__hash__
+ if value in self:
+ __del(self, value, _sa_initiator)
+ # testlib.pragma exempt:__hash__
+ fn(self, value)
+ _tidy(discard)
+ return discard
def remove(fn):
def remove(self, value, _sa_initiator=None):
def __ior__(fn):
def __ior__(self, value):
- if sautil.duck_type_collection(value) is not set:
+ if sautil.duck_type_collection(value) is not Set:
return NotImplemented
for item in value:
if item not in self:
def __isub__(fn):
def __isub__(self, value):
- if sautil.duck_type_collection(value) is not set:
+ if sautil.duck_type_collection(value) is not Set:
return NotImplemented
for item in value:
self.discard(item)
def intersection_update(fn):
def intersection_update(self, other):
- want, have = self.intersection(other), sautil.Set(self)
+ want, have = self.intersection(other), Set(self)
remove, add = have - want, want - have
for item in remove:
def __iand__(fn):
def __iand__(self, other):
- if sautil.duck_type_collection(other) is not set:
+ if sautil.duck_type_collection(other) is not Set:
return NotImplemented
- want, have = self.intersection(other), sautil.Set(self)
+ want, have = self.intersection(other), Set(self)
remove, add = have - want, want - have
for item in remove:
def symmetric_difference_update(fn):
def symmetric_difference_update(self, other):
- want, have = self.symmetric_difference(other), sautil.Set(self)
+ want, have = self.symmetric_difference(other), Set(self)
remove, add = have - want, want - have
for item in remove:
def __ixor__(fn):
def __ixor__(self, other):
- if sautil.duck_type_collection(other) is not set:
+ if sautil.duck_type_collection(other) is not Set:
return NotImplemented
- want, have = self.symmetric_difference(other), sautil.Set(self)
+ want, have = self.symmetric_difference(other), Set(self)
remove, add = have - want, want - have
for item in remove:
'remover': 'remove',
'iterator': '__iter__', }
-class InstrumentedSet(sautil.Set):
+class InstrumentedSet(Set):
"""An instrumented version of the built-in set (or Set)."""
__instrumentation__ = {
__canned_instrumentation = {
list: InstrumentedList,
- sautil.Set: InstrumentedSet,
+ Set: InstrumentedSet,
dict: InstrumentedDict,
}
'remover': 'remove',
'iterator': '__iter__',
'_decorators': _list_decorators(), },
- sautil.Set: { 'appender': 'add',
- 'remover': 'remove',
- 'iterator': '__iter__',
- '_decorators': _set_decorators(), },
+ Set: { 'appender': 'add',
+ 'remover': 'remove',
+ 'iterator': '__iter__',
+ '_decorators': _set_decorators(), },
# decorators are required for dicts and object collections.
dict: { 'iterator': 'itervalues',
'_decorators': _dict_decorators(), },
Set = set
set_types = set, sets.Set
except NameError:
- Set = sets.Set
set_types = sets.Set,
+ # layer some of __builtin__.set's binop behavior onto sets.Set
+ class Set(sets.Set):
+ def _binary_sanity_check(self, other):
+ pass
+
+ def issubset(self, iterable):
+ other = type(self)(iterable)
+ return sets.Set.issubset(self, other)
+ def __le__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__le__(self, other)
+ def issuperset(self, iterable):
+ other = type(self)(iterable)
+ return sets.Set.issuperset(self, other)
+ def __ge__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__ge__(self, other)
+
+ # lt and gt still require a BaseSet
+ def __lt__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__lt__(self, other)
+ def __gt__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__gt__(self, other)
+
+ def __ior__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__ior__(self, other)
+ def __iand__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__iand__(self, other)
+ def __ixor__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__ixor__(self, other)
+ def __isub__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__isub__(self, other)
try:
import cPickle as pickle
"""
if hasattr(specimen, '__emulates__'):
- return specimen.__emulates__
+ # canonicalize set vs sets.Set to a standard: util.Set
+ if (specimen.__emulates__ is not None and
+ issubclass(specimen.__emulates__, set_types)):
+ return Set
+ else:
+ return specimen.__emulates__
isa = isinstance(specimen, type) and issubclass or isinstance
if isa(specimen, list): return list
- if isa(specimen, Set): return Set
+ if isa(specimen, set_types): return Set
if isa(specimen, dict): return dict
if hasattr(specimen, 'append'):
def __init__(self, ____sequence=None, **kwargs):
self._list = []
if ____sequence is None:
- self.update(**kwargs)
+ if kwargs:
+ self.update(**kwargs)
else:
self.update(____sequence, **kwargs)
__isub__ = difference_update
+ if hasattr(Set, '__getstate__'):
+ def __getstate__(self):
+ base = Set.__getstate__(self)
+ return base, self._list
+
+ def __setstate__(self, state):
+ Set.__setstate__(self, state[0])
+ self._list = state[1]
class IdentitySet(object):
"""A set that considers only object id() for uniqueness.
class IdentitySetTest(unittest.TestCase):
def assert_eq(self, identityset, expected_iterable):
- found = sorted(list(identityset))
- expected = sorted(expected_iterable)
+ expected = sorted([id(o) for o in expected_iterable])
+ found = sorted([id(o) for o in identityset])
self.assertEquals(found, expected)
def test_init(self):
import testenv; testenv.configure_for_tests()
-import unittest, doctest
+import doctest, sys, unittest
def suite():
unittest_modules = ['ext.activemapper',
'ext.assignmapper',
'ext.orderinglist',
'ext.associationproxy']
- doctest_modules = ['sqlalchemy.ext.sqlsoup']
+
+ if sys.version_info >= (2, 4):
+ doctest_modules = ['sqlalchemy.ext.sqlsoup']
+ else:
+ doctest_modules = []
alltests = unittest.TestSuite()
for name in unittest_modules:
import testenv; testenv.configure_for_tests()
+import sys
+from operator import and_
from sqlalchemy import *
import sqlalchemy.exceptions as exceptions
from sqlalchemy.orm import create_session, mapper, relation, \
import sqlalchemy.orm.collections as collections
from sqlalchemy.orm.collections import collection
from sqlalchemy import util
-from operator import and_
from testlib import *
+try:
+ py_set = __builtins__.set
+except AttributeError:
+ import sets
+ py_set = sets.Set
+
class Canary(interfaces.AttributeExtension):
def __init__(self):
self.data = set()
def test_set_emulates(self):
class SetIsh(object):
- __emulates__ = set
+ __emulates__ = py_set
def __init__(self):
self.data = set()
def add(self, item):
control.update(d)
assert_eq()
- kw = dict([(ee.a, ee) for ee in [e, creator()]])
- direct.update(**kw)
- control.update(**kw)
- assert_eq()
+ if sys.version_info >= (2, 4):
+ kw = dict([(ee.a, ee) for ee in [e, creator()]])
+ direct.update(**kw)
+ control.update(**kw)
+ assert_eq()
def _test_dict_bulk(self, typecallable, creator=dictable_entity):
class Foo(object):
Column('c1', Integer, primary_key=True),
Column('c2', String(30)))
- @profiling.profiled('ctest_insert', call_range=(40, 50), always=True)
+ @profiling.function_call_count(42, {'2.3': 44})
def test_insert(self):
t1.insert().compile()
- @profiling.profiled('ctest_update', call_range=(40, 50), always=True)
+ @profiling.function_call_count(42, {'2.3': 47})
def test_update(self):
t1.update().compile()
# TODO: this is alittle high
- @profiling.profiled('ctest_select', call_range=(110, 140), always=True)
+ @profiling.function_call_count(125, versions={'2.3': 180})
def test_select(self):
s = select([t1], t1.c.c2==t2.c.c1)
s.compile()
finally:
bar.drop()
+def _missing_decimal():
+ """Python implementation supports decimals"""
+ try:
+ import decimal
+ return False
+ except ImportError:
+ return True
+
class NumericTest(AssertMixin):
def setUpAll(self):
global numeric_table, metadata
def tearDown(self):
numeric_table.delete().execute()
+ @testing.fails_if(_missing_decimal)
def test_decimal(self):
from decimal import Decimal
numeric_table.insert().execute(numericcol=3.5, floatcol=5.6, ncasdec=12.4, fcasdec=15.75)
-import new
+import itertools, new
__all__ = 'set', 'sorted', '_function_named'
try:
set = set
except NameError:
- from sets import Set as set
+ import sets
+
+ # keep this in sync with sqlalchemy.util.Set
+ # can't just import it in testlib because of coverage, load order, etc.
+ class set(sets.Set):
+ def _binary_sanity_check(self, other):
+ pass
+
+ def issubset(self, iterable):
+ other = type(self)(iterable)
+ return sets.Set.issubset(self, other)
+ def __le__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__le__(self, other)
+ def issuperset(self, iterable):
+ other = type(self)(iterable)
+ return sets.Set.issuperset(self, other)
+ def __ge__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__ge__(self, other)
+
+ # lt and gt still require a BaseSet
+ def __lt__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__lt__(self, other)
+ def __gt__(self, other):
+ sets.Set._binary_sanity_check(self, other)
+ return sets.Set.__gt__(self, other)
+
+ def __ior__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__ior__(self, other)
+ def __iand__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__iand__(self, other)
+ def __ixor__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__ixor__(self, other)
+ def __isub__(self, other):
+ if not isinstance(other, sets.BaseSet):
+ return NotImplemented
+ return sets.Set.__isub__(self, other)
try:
sorted = sorted
except NameError:
- def sorted(iterable):
- return list(iterable).sort()
+ def sorted(iterable, cmp=None):
+ l = list(iterable)
+ if cmp:
+ l.sort(cmp)
+ else:
+ l.sort()
+ return l
def _function_named(fn, newname):
try:
raise AssertionError(
"Function call count %s not within %s%% "
"of expected %s. (Python version %s)" % (
- calls, variance, count, py_version))
+ calls, (variance * 100), count, py_version))
return result
finally:
if os.path.exists(filename):
# sugar ('testing.db'); set here by config() at runtime
db = None
+def fails_if(callable_):
+ """Mark a test as expected to fail if callable_ returns True.
+
+ If the callable returns false, the test is run and reported as normal.
+ However if the callable returns true, the test is expected to fail and the
+ unit test logic is inverted: if the test fails, a success is reported. If
+ the test succeeds, a failure is reported.
+ """
+
+ docstring = getattr(callable_, '__doc__', callable_.__name__)
+ description = docstring.split('\n')[0]
+
+ def decorate(fn):
+ fn_name = fn.__name__
+ def maybe(*args, **kw):
+ if not callable_():
+ return fn(*args, **kw)
+ else:
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected (condition: %s): %s " % (
+ fn_name, description, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' (condition: %s)" %
+ (fn_name, description))
+ return _function_named(maybe, fn_name)
+ return decorate
+
def fails_on(*dbs):
"""Mark a test as expected to fail on one or more database implementations.