-"""Classes used in pickling tests, need to be at the module level for unpickling."""
+"""Classes used in pickling tests, need to be at the module level for
+unpickling.
+"""
from . import fixtures
+
class User(fixtures.ComparableEntity):
pass
+
class Order(fixtures.ComparableEntity):
pass
+
class Dingaling(fixtures.ComparableEntity):
pass
+
class EmailUser(User):
pass
+
class Address(fixtures.ComparableEntity):
pass
+
# TODO: these are kind of arbitrary....
class Child1(fixtures.ComparableEntity):
pass
+
class Child2(fixtures.ComparableEntity):
pass
+
class Parent(fixtures.ComparableEntity):
pass
+
class Screen(object):
+
def __init__(self, obj, parent=None):
self.obj = obj
self.parent = parent
+
class Foo(object):
+
def __init__(self, moredata):
self.data = 'im data'
self.stuff = 'im stuff'
self.moredata = moredata
+
__hash__ = object.__hash__
+
def __eq__(self, other):
return other.data == self.data and \
other.stuff == self.stuff and \
class Bar(object):
+
def __init__(self, x, y):
self.x = x
self.y = y
+
__hash__ = object.__hash__
+
def __eq__(self, other):
return other.__class__ is self.__class__ and \
other.x == self.x and \
other.y == self.y
+
def __str__(self):
return "Bar(%d, %d)" % (self.x, self.y)
+
class OldSchool:
+
def __init__(self, x, y):
self.x = x
self.y = y
+
def __eq__(self, other):
return other.__class__ is self.__class__ and \
other.x == self.x and \
other.y == self.y
+
class OldSchoolWithoutCompare:
+
def __init__(self, x, y):
self.x = x
self.y = y
+
class BarWithoutCompare(object):
+
def __init__(self, x, y):
self.x = x
self.y = y
+
def __str__(self):
return "Bar(%d, %d)" % (self.x, self.y)
class NotComparable(object):
+
def __init__(self, data):
self.data = data
class BrokenComparable(object):
+
def __init__(self, data):
self.data = data
def __ne__(self, other):
raise NotImplementedError
-
_current_test = None
+
def profiled(target=None, **target_opts):
"""Function profiling.
else:
stats.print_stats()
- print_callers = target_opts.get('print_callers',
- profile_config['print_callers'])
+ print_callers = target_opts.get(
+ 'print_callers', profile_config['print_callers'])
if print_callers:
stats.print_callers()
- print_callees = target_opts.get('print_callees',
- profile_config['print_callees'])
+ print_callees = target_opts.get(
+ 'print_callees', profile_config['print_callees'])
if print_callees:
stats.print_callees()
"""
def __init__(self, filename):
- self.write = config.options is not None and config.options.write_profiles
+ self.write = (
+ config.options is not None and
+ config.options.write_profiles
+ )
self.fname = os.path.abspath(filename)
self.short_fname = os.path.split(self.fname)[-1]
- self.data = collections.defaultdict(lambda: collections.defaultdict(dict))
+ self.data = collections.defaultdict(
+ lambda: collections.defaultdict(dict))
self._read()
if self.write:
# rewrite for the case where features changed,
def has_stats(self):
test_key = _current_test
- return test_key in self.data and self.platform_key in self.data[test_key]
+ return (
+ test_key in self.data and
+ self.platform_key in self.data[test_key]
+ )
def result(self, callcount):
test_key = _current_test
per_platform['current_count'] += 1
return result
-
def _header(self):
return \
"# %s\n"\
"# assertions are raised if the counts do not match.\n"\
"# \n"\
"# To add a new callcount test, apply the function_call_count \n"\
- "# decorator and re-run the tests using the --write-profiles option - \n"\
- "# this file will be rewritten including the new count.\n"\
+ "# decorator and re-run the tests using the --write-profiles \n"\
+ "# option - this file will be rewritten including the new count.\n"\
"# \n"\
"" % (self.fname)
test_key, platform_key, counts = line.split()
per_fn = self.data[test_key]
per_platform = per_fn[platform_key]
- per_platform['counts'] = [int(count) for count in counts.split(",")]
+ c = [int(count) for count in counts.split(",")]
+ per_platform['counts'] = c
per_platform['lineno'] = lineno + 1
per_platform['current_count'] = 0
profile_f.close()
profile_f.write("\n# TEST: %s\n\n" % test_key)
for platform_key in sorted(per_fn):
per_platform = per_fn[platform_key]
- profile_f.write(
- "%s %s %s\n" % (
- test_key,
- platform_key, ",".join(str(count) for count in per_platform['counts'])
- )
- )
+ c = ",".join(str(count) for count in per_platform['counts'])
+ profile_f.write("%s %s %s\n" % (test_key, platform_key, c))
profile_f.close()
from sqlalchemy.util.compat import update_wrapper
+
def function_call_count(variance=0.05):
"""Assert a target for a test case's function call count.
def decorate(fn):
def wrap(*args, **kw):
-
if cProfile is None:
raise SkipTest("cProfile is not installed")
gc_collect()
-
timespent, load_stats, fn_result = _profile(
fn, *args, **kw
)
if abs(callcount - expected_count) > deviance:
raise AssertionError(
"Adjusted function call count %s not within %s%% "
- "of expected %s. (Delete line %d of file %s to regenerate "
- "this callcount, when tests are run with --write-profiles.)"
+ "of expected %s. (Delete line %d of file %s to "
+ "regenerate this callcount, when tests are run "
+ "with --write-profiles.)"
% (
callcount, (variance * 100),
expected_count, line_no,
ended = time.time()
return ended - began, load_stats, locals()['result']
-
from . import exclusions
+
class Requirements(object):
def __init__(self, db, config):
self.db = db
"""
return exclusions.open()
-
@property
def datetime(self):
"""target dialect supports representation of Python
@property
def empty_strings_varchar(self):
- """target database can persist/return an empty string with a varchar."""
+ """target database can persist/return an empty string with a
+ varchar.
+ """
return exclusions.open()
@property
return exclusions.open()
-
@property
def update_from(self):
"""Target must support UPDATE..FROM syntax"""
import nose
+
def main():
nose.main(addplugins=[NoseSQLAlchemy()])
table_options = {}
+
def Table(*args, **kw):
"""A schema.Table wrapper/hook for dialect-specific tweaks."""
event.listen(col, 'after_parent_attach', add_seq, propagate=True)
return col
+
def _truncate_name(dialect, name):
if len(name) > dialect.max_identifier_length:
return name[0:max(dialect.max_identifier_length - 6, 0)] + \
"_" + hex(hash(name) % 64)[2:]
else:
return name
-
(1, 'some data')
)
-
@requirements.create_table
@util.provide_metadata
def test_create_table(self):
)
self._simple_roundtrip()
-
@requirements.drop_table
@util.provide_metadata
def test_drop_table(self):
)
-__all__ = ('TableDDLTest', )
\ No newline at end of file
+__all__ = ('TableDDLTest', )
from ..schema import Table, Column
+
class LastrowidTest(fixtures.TablesTest):
run_deletes = 'each'
else:
engine = config.db
-
r = engine.execute(
self.tables.autoinc_pk.insert(),
data="some data"
assert r.is_insert
assert not r.returns_rows
+
class ReturningTest(fixtures.TablesTest):
run_deletes = 'each'
__requires__ = 'returning', 'autoincrement_insert'
__all__ = ('LastrowidTest', 'InsertBehaviorTest', 'ReturningTest')
-
-
metadata, users = None, None
+
class HasTableTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
assert config.db.dialect.has_table(conn, "test_table")
assert not config.db.dialect.has_table(conn, "nonexistent_table")
+
class HasSequenceTest(fixtures.TestBase):
__requires__ = 'sequences',
self._test_get_table_oid('users', schema='test_schema')
-__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')
\ No newline at end of file
+__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')
from ..schema import Table, Column
import datetime
+
class _UnicodeFixture(object):
__requires__ = 'unicode_data',
for row in rows:
assert isinstance(row[0], unicode)
-
def _test_empty_strings(self):
unicode_table = self.tables.unicode_table
).first()
eq_(row, (u'',))
+
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data',
datatype = Unicode(255)
-
@requirements.empty_strings_varchar
def test_empty_strings_varchar(self):
self._test_empty_strings()
+
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data', 'text_type'
foo.create(config.db)
foo.drop(config.db)
+
class _DateFixture(object):
compare = None
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
+
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_microseconds',
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
+
class TimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time',
datatype = Time
data = datetime.time(12, 57, 18)
+
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time_microseconds',
datatype = Time
data = datetime.time(12, 57, 18, 396)
+
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
datatype = Date
data = datetime.date(2012, 10, 15)
+
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
+
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_historic',
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
+
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date_historic',
datatype = Date
'DateTimeHistoricTest', 'DateTimeCoercedToDateTimeTest',
'TimeMicrosecondsTest', 'TimeTest', 'DateTimeMicrosecondsTest',
'DateHistoricTest', 'StringTest')
-
-
-
from .. import fixtures, config
-from ..config import requirements
from ..assertions import eq_
-from .. import engines
-from sqlalchemy import Integer, String, select
+from sqlalchemy import Integer, String
from ..schema import Table, Column
]
)
-__all__ = ('SimpleUpdateDeleteTest', )
\ No newline at end of file
+__all__ = ('SimpleUpdateDeleteTest', )
else:
# assume CPython - straight gc.collect, lazy_gc() is a pass
gc_collect = gc.collect
+
def lazy_gc():
pass
+
def picklers():
picklers = set()
# Py2K
).to_integral(decimal.ROUND_FLOOR) / \
pow(10, prec)
+
class RandomSet(set):
def __iter__(self):
l = list(set.__iter__(self))
def copy(self):
return RandomSet(self)
+
def conforms_partial_ordering(tuples, sorted_elements):
"""True if the given sorting conforms to the given partial ordering."""
else:
return True
+
def all_partial_orderings(tuples, elements):
edges = defaultdict(set)
for parent, child in tuples:
return fn
-
def run_as_contextmanager(ctx, fn, *arg, **kw):
"""Run the given function under the given contextmanager,
simulating the behavior of 'with' to support older
else:
return raise_
+
def rowset(results):
"""Converts the results of sql execution into a plain set of column tuples.
metadata.drop_all()
self.metadata = prev_meta
+
class adict(dict):
"""Dict keys available as attributes. Shadows."""
def __getattribute__(self, key):
def get_all(self, *keys):
return tuple([self[key] for key in keys])
-
-
from .. import exc as sa_exc
from .. import util
+
def testing_warn(msg, stacklevel=3):
"""Replaces sqlalchemy.util.warn during tests."""
else:
warnings.warn_explicit(msg, filename, lineno)
+
def resetwarnings():
"""Reset warning behavior to testing defaults."""
warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning)
warnings.filterwarnings('error', category=sa_exc.SAWarning)
+
def assert_warnings(fn, warnings):
"""Assert that each of the given warnings are emitted by fn."""
canary = []
orig_warn = util.warn
+
def capture_warnings(*args, **kw):
orig_warn(*args, **kw)
popwarn = warnings.pop(0)