from sqlalchemy.testing.util import round_decimal
from sqlalchemy.testing import fixtures
+
class AdaptTest(fixtures.TestBase):
def _all_dialect_modules(self):
def _types_for_mod(self, mod):
for key in dir(mod):
typ = getattr(mod, key)
- if not isinstance(typ, type) or not issubclass(typ, types.TypeEngine):
+ if not isinstance(
+ typ,
+ type) or not issubclass(
+ typ,
+ types.TypeEngine):
continue
yield typ
(CLOB, "CLOB"),
(VARCHAR(10), ("VARCHAR(10)", "VARCHAR(10 CHAR)")),
(NVARCHAR(10), ("NVARCHAR(10)", "NATIONAL VARCHAR(10)",
- "NVARCHAR2(10)")),
+ "NVARCHAR2(10)")),
(CHAR, "CHAR"),
(NCHAR, ("NCHAR", "NATIONAL CHAR")),
(BLOB, ("BLOB", "BLOB SUB_TYPE 0")),
try:
compiled = types.to_instance(type_).\
- compile(dialect=dialect)
+ compile(dialect=dialect)
except NotImplementedError:
continue
yield False, typ, up_adaptions
for subcl in typ.__subclasses__():
if subcl is not typ and \
- typ is not TypeDecorator and \
- "sqlalchemy" in subcl.__module__:
+ typ is not TypeDecorator and \
+ "sqlalchemy" in subcl.__module__:
yield True, subcl, [typ]
for is_down_adaption, typ, target_adaptions in adaptions():
assert t1._type_affinity is String
assert t1.dialect_impl(d)._type_affinity is postgresql.UUID
+
class PickleMetadataTest(fixtures.TestBase):
def testmeta(self):
raw_dialect_impl = raw_impl.dialect_impl(dialect_)
dec_dialect_impl = dec_type.dialect_impl(dialect_)
eq_(dec_dialect_impl.__class__, MyType)
- eq_(raw_dialect_impl.__class__, dec_dialect_impl.impl.__class__)
+ eq_(raw_dialect_impl.__class__,
+ dec_dialect_impl.impl.__class__)
self.assert_compile(
MyType(**kw),
String().dialect_impl(dialect=sl).__class__
)
eq_(
- t.dialect_impl(dialect=pg).impl.__class__,
- Float().dialect_impl(pg).__class__
+ t.dialect_impl(dialect=pg).impl.__class__,
+ Float().dialect_impl(pg).__class__
)
def test_type_decorator_repr(self):
super(MyType, self).__init__()
self.foo = foo
self.dialect_specific_args = kwargs
+
def adapt(self, cls):
return cls(foo=self.foo, **self.dialect_specific_args)
t = MyType(bar='bar')
eq_(a.foo, 'foo')
eq_(a.dialect_specific_args['bar'], 'bar')
-
@classmethod
def define_tables(cls, metadata):
class MyType(types.UserDefinedType):
def get_col_spec(self):
return "VARCHAR(100)"
+
def bind_processor(self, dialect):
def process(value):
return "BIND_IN" + value
return process
+
def result_processor(self, dialect, coltype):
def process(value):
return value + "BIND_OUT"
return process
+
def adapt(self, typeobj):
return typeobj()
class MyDecoratedType(types.TypeDecorator):
impl = String
+
def bind_processor(self, dialect):
impl_processor = super(MyDecoratedType, self).bind_processor(dialect)\
- or (lambda value: value)
+ or (lambda value: value)
+
def process(value):
return "BIND_IN" + impl_processor(value)
return process
+
def result_processor(self, dialect, coltype):
impl_processor = super(MyDecoratedType, self).result_processor(dialect, coltype)\
- or (lambda value: value)
+ or (lambda value: value)
+
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
+
def copy(self):
return MyDecoratedType()
def bind_processor(self, dialect):
impl_processor = super(MyUnicodeType, self).bind_processor(dialect)\
- or (lambda value: value)
+ or (lambda value: value)
def process(value):
return "BIND_IN" + impl_processor(value)
def result_processor(self, dialect, coltype):
impl_processor = super(MyUnicodeType, self).result_processor(dialect, coltype)\
- or (lambda value: value)
+ or (lambda value: value)
+
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
return MyUnicodeType(self.impl.length)
Table('users', metadata,
- Column('user_id', Integer, primary_key=True),
- # totall custom type
- Column('goofy', MyType, nullable=False),
+ Column('user_id', Integer, primary_key=True),
+ # totall custom type
+ Column('goofy', MyType, nullable=False),
- # decorated type with an argument, so its a String
- Column('goofy2', MyDecoratedType(50), nullable=False),
+ # decorated type with an argument, so its a String
+ Column('goofy2', MyDecoratedType(50), nullable=False),
+
+ Column('goofy4', MyUnicodeType(50), nullable=False),
+ Column('goofy7', MyNewUnicodeType(50), nullable=False),
+ Column('goofy8', MyNewIntType, nullable=False),
+ Column('goofy9', MyNewIntSubClass, nullable=False),
+ )
- Column('goofy4', MyUnicodeType(50), nullable=False),
- Column('goofy7', MyNewUnicodeType(50), nullable=False),
- Column('goofy8', MyNewIntType, nullable=False),
- Column('goofy9', MyNewIntSubClass, nullable=False),
- )
class TypeCoerceCastTest(fixtures.TablesTest):
cls.MyType = MyType
Table('t', metadata,
- Column('data', String(50))
- )
+ Column('data', String(50))
+ )
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_insert_round_trip_cast(self):
self._test_insert_round_trip(cast)
)
@testing.fails_on("oracle",
- "ORA-00906: missing left parenthesis - "
- "seems to be CAST(:param AS type)")
+ "ORA-00906: missing left parenthesis - "
+ "seems to be CAST(:param AS type)")
def test_coerce_from_nulltype_cast(self):
self._test_coerce_from_nulltype(cast)
)
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_vs_non_coerced_cast(self):
self._test_vs_non_coerced(cast)
t.insert().values(data=coerce_fn('d1', MyType)).execute()
- eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
- [('BIND_INd1', 'BIND_INd1BIND_OUT')]
- )
+ eq_(select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(), [
+ ('BIND_INd1', 'BIND_INd1BIND_OUT')])
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_vs_non_coerced_alias_cast(self):
self._test_vs_non_coerced_alias(cast)
eq_(
select([t.c.data, coerce_fn(t.c.data, MyType)]).
- alias().select().execute().fetchall(),
+ alias().select().execute().fetchall(),
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
)
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_vs_non_coerced_where_cast(self):
self._test_vs_non_coerced_where(cast)
# coerce on left side
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)]).\
- where(coerce_fn(t.c.data, MyType) == 'd1').\
- execute().fetchall(),
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).
+ where(coerce_fn(t.c.data, MyType) == 'd1').
+ execute().fetchall(),
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
)
# coerce on right side
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)]).\
- where(t.c.data == coerce_fn('d1', MyType)).\
- execute().fetchall(),
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).
+ where(t.c.data == coerce_fn('d1', MyType)).
+ execute().fetchall(),
[('BIND_INd1', 'BIND_INd1BIND_OUT')]
)
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_coerce_none_cast(self):
self._test_coerce_none(cast)
t = self.tables.t
t.insert().values(data=coerce_fn('d1', MyType)).execute()
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)]).\
- where(t.c.data == coerce_fn(None, MyType)).\
- execute().fetchall(),
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).
+ where(t.c.data == coerce_fn(None, MyType)).
+ execute().fetchall(),
[]
)
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)]).\
- where(coerce_fn(t.c.data, MyType) == None).\
- execute().fetchall(),
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).
+ where(coerce_fn(t.c.data, MyType) == None).
+ execute().fetchall(),
[]
)
@testing.fails_on("oracle",
- "oracle doesn't like CAST in the VALUES of an INSERT")
+ "oracle doesn't like CAST in the VALUES of an INSERT")
def test_resolve_clause_element_cast(self):
self._test_resolve_clause_element(cast)
)
@testing.fails_on("oracle",
- "ORA-00906: missing left parenthesis - "
- "seems to be CAST(:param AS type)")
+ "ORA-00906: missing left parenthesis - "
+ "seems to be CAST(:param AS type)")
def test_cast_existing_typed(self):
MyType = self.MyType
coerce_fn = cast
def get_col_spec(self):
return "UTYPEONE"
+
def bind_processor(self, dialect):
def process(value):
return value + "UONE"
def get_col_spec(self):
return "UTYPETWO"
+
def bind_processor(self, dialect):
def process(value):
return value + "UTWO"
self.UTypeTwo = UTypeTwo
self.UTypeThree = UTypeThree
self.variant = self.UTypeOne().with_variant(
- self.UTypeTwo(), 'postgresql')
+ self.UTypeTwo(), 'postgresql')
self.composite = self.variant.with_variant(
- self.UTypeThree(), 'mysql')
+ self.UTypeThree(), 'mysql')
def test_illegal_dupe(self):
v = self.UTypeOne().with_variant(
"in the mapping for this Variant",
lambda: v.with_variant(self.UTypeThree(), 'postgresql')
)
+
def test_compile(self):
self.assert_compile(
self.variant,
def test_bind_process(self):
eq_(
self.variant._cached_bind_processor(
- dialects.mysql.dialect())('foo'),
+ dialects.mysql.dialect())('foo'),
'fooUONE'
)
eq_(
self.variant._cached_bind_processor(
- default.DefaultDialect())('foo'),
+ default.DefaultDialect())('foo'),
'fooUONE'
)
eq_(
self.variant._cached_bind_processor(
- dialects.postgresql.dialect())('foo'),
+ dialects.postgresql.dialect())('foo'),
'fooUTWO'
)
def test_bind_process_composite(self):
assert self.composite._cached_bind_processor(
- dialects.mysql.dialect()) is None
+ dialects.mysql.dialect()) is None
eq_(
self.composite._cached_bind_processor(
- default.DefaultDialect())('foo'),
+ default.DefaultDialect())('foo'),
'fooUONE'
)
eq_(
self.composite._cached_bind_processor(
- dialects.postgresql.dialect())('foo'),
+ dialects.postgresql.dialect())('foo'),
'fooUTWO'
)
+
class UnicodeTest(fixtures.TestBase):
"""Exercise the Unicode and related types.
expected
)
- data = util.u("Alors vous imaginez ma surprise, au lever du jour, quand "\
- "une drôle de petite voix m’a réveillé. "\
- "Elle disait: « S’il vous plaît… dessine-moi un mouton! »")
+ data = util.u("Alors vous imaginez ma surprise, au lever du jour, quand "
+ "une drôle de petite voix m’a réveillé. "
+ "Elle disait: « S’il vous plaît… dessine-moi un mouton! »")
def test_unicode_warnings_typelevel_native_unicode(self):
def setup_class(cls):
global enum_table, non_native_enum_table, metadata
metadata = MetaData(testing.db)
- enum_table = Table('enum_table', metadata,
- Column("id", Integer, primary_key=True),
- Column('someenum', Enum('one', 'two', 'three', name='myenum'))
- )
-
- non_native_enum_table = Table('non_native_enum_table', metadata,
- Column("id", Integer, primary_key=True),
- Column('someenum', Enum('one', 'two', 'three', native_enum=False)),
- )
+ enum_table = Table(
+ 'enum_table', metadata, Column(
+ "id", Integer, primary_key=True), Column(
+ 'someenum', Enum(
+ 'one', 'two', 'three', name='myenum')))
+
+ non_native_enum_table = Table(
+ 'non_native_enum_table', metadata, Column(
+ "id", Integer, primary_key=True), Column(
+ 'someenum', Enum(
+ 'one', 'two', 'three', native_enum=False)), )
metadata.create_all()
metadata.drop_all()
@testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type character varying')
+ 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
+ 'but expression is of type character varying')
def test_round_trip(self):
enum_table.insert().execute([
{'id': 1, 'someenum': 'two'},
eq_(
non_native_enum_table.select().
- order_by(non_native_enum_table.c.id).execute().fetchall(),
+ order_by(non_native_enum_table.c.id).execute().fetchall(),
[
(1, 'two'),
(2, 'two'),
'mysql', 'Inconsistent behavior across various OS/drivers')
def test_constraint(self):
assert_raises(exc.DBAPIError,
- enum_table.insert().execute,
- {'id': 4, 'someenum': 'four'}
- )
+ enum_table.insert().execute,
+ {'id': 4, 'someenum': 'four'}
+ )
def test_non_native_constraint_custom_type(self):
class Foob(object):
def __init__(self, values):
self.impl = Enum(
- *[v.name for v in values],
- name="myenum",
- native_enum=False
- )
-
+ *[v.name for v in values],
+ name="myenum",
+ native_enum=False
+ )
def _set_table(self, table, column):
self.impl._set_table(table, column)
m = MetaData()
t1 = Table('t', m, Column('x', MyEnum([Foob('a'), Foob('b')])))
- const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+ const = [
+ c for c in t1.constraints if isinstance(
+ c,
+ CheckConstraint)][0]
self.assert_compile(
AddConstraint(const),
dialect="default"
)
-
-
- @testing.fails_on('mysql',
- "the CHECK constraint doesn't raise an exception for unknown reason")
+ @testing.fails_on(
+ 'mysql',
+ "the CHECK constraint doesn't raise an exception for unknown reason")
def test_non_native_constraint(self):
assert_raises(exc.DBAPIError,
- non_native_enum_table.insert().execute,
- {'id': 4, 'someenum': 'four'}
- )
+ non_native_enum_table.insert().execute,
+ {'id': 4, 'someenum': 'four'}
+ )
def test_mock_engine_no_prob(self):
"""ensure no 'checkfirst' queries are run when enums
e = engines.mock_engine()
t = Table('t1', MetaData(),
- Column('x', Enum("x", "y", name="pge"))
- )
+ Column('x', Enum("x", "y", name="pge"))
+ )
t.create(e, checkfirst=False)
# basically looking for the start of
# the constraint, or the ENUM def itself,
def test_repr(self):
e = Enum("x", "y", name="somename", convert_unicode=True,
- quote=True, inherit_schema=True)
+ quote=True, inherit_schema=True)
eq_(
repr(e),
"Enum('x', 'y', name='somename', inherit_schema=True)"
)
+
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
__excluded_on__ = (
('mysql', '<', (4, 1, 1)), # screwy varbinary types
return value
metadata = MetaData(testing.db)
- binary_table = Table('binary_table', metadata,
- Column('primary_id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', LargeBinary),
- Column('data_slice', LargeBinary(100)),
- Column('misc', String(30)),
- Column('pickled', PickleType),
- Column('mypickle', MyPickleType)
- )
+ binary_table = Table(
+ 'binary_table', metadata, Column(
+ 'primary_id', Integer, primary_key=True, test_needs_autoincrement=True), Column(
+ 'data', LargeBinary), Column(
+ 'data_slice', LargeBinary(100)), Column(
+ 'misc', String(30)), Column(
+ 'pickled', PickleType), Column(
+ 'mypickle', MyPickleType))
metadata.create_all()
@engines.close_first
stream1 = self.load_stream('binary_data_one.dat')
stream2 = self.load_stream('binary_data_two.dat')
binary_table.insert().execute(
- primary_id=1,
- misc='binary_data_one.dat',
- data=stream1,
- data_slice=stream1[0:100],
- pickled=testobj1,
- mypickle=testobj3)
+ primary_id=1,
+ misc='binary_data_one.dat',
+ data=stream1,
+ data_slice=stream1[0:100],
+ pickled=testobj1,
+ mypickle=testobj3)
binary_table.insert().execute(
- primary_id=2,
- misc='binary_data_two.dat',
- data=stream2,
- data_slice=stream2[0:99],
- pickled=testobj2)
+ primary_id=2,
+ misc='binary_data_two.dat',
+ data=stream2,
+ data_slice=stream2[0:99],
+ pickled=testobj2)
binary_table.insert().execute(
- primary_id=3,
- misc='binary_data_two.dat',
- data=None,
- data_slice=stream2[0:99],
- pickled=None)
+ primary_id=3,
+ misc='binary_data_two.dat',
+ data=None,
+ data_slice=stream2[0:99],
+ pickled=None)
for stmt in (
binary_table.select(order_by=binary_table.c.primary_id),
text(
"select * from binary_table order by binary_table.primary_id",
typemap={'pickled': PickleType,
- 'mypickle': MyPickleType,
- 'data': LargeBinary, 'data_slice': LargeBinary},
+ 'mypickle': MyPickleType,
+ 'data': LargeBinary, 'data_slice': LargeBinary},
bind=testing.db)
):
l = stmt.execute().fetchall()
data = os.urandom(32)
binary_table.insert().execute(data=data)
eq_(binary_table.select().
- where(binary_table.c.data == data).alias().
- count().scalar(), 1)
-
+ where(binary_table.c.data == data).alias().
+ count().scalar(), 1)
@testing.requires.binary_literals
def test_literal_roundtrip(self):
compiled = select([cast(literal(util.b("foo")), LargeBinary)]).compile(
- dialect=testing.db.dialect,
- compile_kwargs={"literal_binds": True})
+ dialect=testing.db.dialect,
+ compile_kwargs={"literal_binds": True})
result = testing.db.execute(compiled)
eq_(result.scalar(), util.b("foo"))
with open(f, mode='rb') as o:
return o.read()
-class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
+class ExpressionTest(
+ fixtures.TestBase,
+ AssertsExecutionResults,
+ AssertsCompiledSQL):
__dialect__ = 'default'
@classmethod
def get_col_spec(self):
return "INT"
+
def bind_processor(self, dialect):
def process(value):
return value * 10
return process
+
def result_processor(self, dialect, coltype):
def process(value):
return value / 10
def adapt_operator(self, op):
return {operators.add: operators.sub,
- operators.sub: operators.add}.get(op, op)
+ operators.sub: operators.add}.get(op, op)
class MyTypeDec(types.TypeDecorator):
impl = String
meta = MetaData(testing.db)
test_table = Table('test', meta,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)),
- Column('atimestamp', Date),
- Column('avalue', MyCustomType),
- Column('bvalue', MyTypeDec(50)),
- )
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)),
+ Column('atimestamp', Date),
+ Column('avalue', MyCustomType),
+ Column('bvalue', MyTypeDec(50)),
+ )
meta.create_all()
test_table.insert().execute({
- 'id': 1,
- 'data': 'somedata',
- 'atimestamp': datetime.date(2007, 10, 15),
- 'avalue': 25, 'bvalue': 'foo'})
+ 'id': 1,
+ 'data': 'somedata',
+ 'atimestamp': datetime.date(2007, 10, 15),
+ 'avalue': 25, 'bvalue': 'foo'})
@classmethod
def teardown_class(cls):
eq_(
testing.db.execute(
- select([test_table.c.id, test_table.c.data, test_table.c.atimestamp])
- .where(expr),
- {"thedate": datetime.date(2007, 10, 15)}).fetchall(),
+ select([test_table.c.id, test_table.c.data, test_table.c.atimestamp])
+ .where(expr),
+ {"thedate": datetime.date(2007, 10, 15)}).fetchall(),
[(1, 'somedata', datetime.date(2007, 10, 15))]
)
eq_(
testing.db.execute(test_table.select().where(expr),
- {'somevalue': 25}).fetchall(),
+ {'somevalue': 25}).fetchall(),
[(1, 'somedata', datetime.date(2007, 10, 15), 25,
- 'BIND_INfooBIND_OUT')]
+ 'BIND_INfooBIND_OUT')]
)
expr = test_table.c.bvalue == bindparam("somevalue")
eq_(
testing.db.execute(test_table.select().where(expr),
- {"somevalue": "foo"}).fetchall(),
+ {"somevalue": "foo"}).fetchall(),
[(1, 'somedata',
datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
)
expr = column('foo', CHAR) == "asdf"
eq_(expr.right.type.__class__, CHAR)
-
def test_typedec_operator_adapt(self):
expr = test_table.c.bvalue + "hi"
class CoerceNothing(TypeDecorator):
coerce_to_is_types = ()
impl = Integer
+
class CoerceBool(TypeDecorator):
coerce_to_is_types = (bool, )
impl = Boolean
+
class CoerceNone(TypeDecorator):
coerce_to_is_types = (type(None),)
impl = Integer
dialect=default.DefaultDialect(supports_native_boolean=True)
)
-
def test_typedec_righthand_coercion(self):
class MyTypeDec(types.TypeDecorator):
impl = String
):
for other in (Numeric(10, 2), Integer):
expr = op(
- column('bar', types.Numeric(10, 2)),
- column('foo', other)
- )
+ column('bar', types.Numeric(10, 2)),
+ column('foo', other)
+ )
assert isinstance(expr.type, types.Numeric)
expr = op(
- column('foo', other),
- column('bar', types.Numeric(10, 2))
- )
+ column('foo', other),
+ column('bar', types.Numeric(10, 2))
+ )
assert isinstance(expr.type, types.Numeric)
def test_null_comparison(self):
assert distinct(test_table.c.data).type == test_table.c.data.type
assert test_table.c.data.distinct().type == test_table.c.data.type
+
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
def test_string_collation(self):
self.assert_compile(String(50, collation="FOO"),
- 'VARCHAR(50) COLLATE "FOO"')
+ 'VARCHAR(50) COLLATE "FOO"')
def test_char_plain(self):
self.assert_compile(CHAR(), "CHAR")
def test_char_collation(self):
self.assert_compile(CHAR(50, collation="FOO"),
- 'CHAR(50) COLLATE "FOO"')
+ 'CHAR(50) COLLATE "FOO"')
def test_text_plain(self):
self.assert_compile(Text(), "TEXT")
def test_text_collation(self):
self.assert_compile(Text(collation="FOO"),
- 'TEXT COLLATE "FOO"')
+ 'TEXT COLLATE "FOO"')
def test_default_compile_pg_inet(self):
self.assert_compile(dialects.postgresql.INET(), "INET",
- allow_dialect_select=True)
+ allow_dialect_select=True)
def test_default_compile_pg_float(self):
self.assert_compile(dialects.postgresql.FLOAT(), "FLOAT",
- allow_dialect_select=True)
+ allow_dialect_select=True)
def test_default_compile_mysql_integer(self):
self.assert_compile(
- dialects.mysql.INTEGER(display_width=5), "INTEGER(5)",
- allow_dialect_select=True)
+ dialects.mysql.INTEGER(display_width=5), "INTEGER(5)",
+ allow_dialect_select=True)
def test_numeric_plain(self):
self.assert_compile(types.NUMERIC(), 'NUMERIC')
self.assert_compile(types.DECIMAL(2, 4), 'DECIMAL(2, 4)')
-
-
class NumericRawSQLTest(fixtures.TestBase):
"""Test what DBAPIs and dialects return without any typing
def _fixture(self, metadata, type, data):
t = Table('t', metadata,
- Column("val", type)
- )
+ Column("val", type)
+ )
metadata.create_all()
t.insert().execute(val=data)
eq_(val, 46.583)
-
-
class IntervalTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
global interval_table, metadata
metadata = MetaData(testing.db)
- interval_table = Table("intervaltable", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("native_interval", Interval()),
- Column("native_interval_args", Interval(day_precision=3, second_precision=6)),
- Column("non_native_interval", Interval(native=False)),
- )
+ interval_table = Table(
+ "intervaltable", metadata, Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True), Column(
+ "native_interval", Interval()), Column(
+ "native_interval_args", Interval(
+ day_precision=3, second_precision=6)), Column(
+ "non_native_interval", Interval(
+ native=False)), )
metadata.create_all()
@engines.close_first
assert adapted.native is False
eq_(str(adapted), "DATETIME")
- @testing.fails_on("postgresql+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
- @testing.fails_on("oracle+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
+ @testing.fails_on(
+ "postgresql+zxjdbc",
+ "Not yet known how to pass values of the INTERVAL type")
+ @testing.fails_on(
+ "oracle+zxjdbc",
+ "Not yet known how to pass values of the INTERVAL type")
def test_roundtrip(self):
small_delta = datetime.timedelta(days=15, seconds=5874)
delta = datetime.timedelta(414)
interval_table.insert().execute(
- native_interval=small_delta,
- native_interval_args=delta,
- non_native_interval=delta
- )
+ native_interval=small_delta,
+ native_interval_args=delta,
+ non_native_interval=delta
+ )
row = interval_table.select().execute().first()
eq_(row['native_interval'], small_delta)
eq_(row['native_interval_args'], delta)
eq_(row['non_native_interval'], delta)
- @testing.fails_on("oracle+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
+ @testing.fails_on(
+ "oracle+zxjdbc",
+ "Not yet known how to pass values of the INTERVAL type")
def test_null(self):
- interval_table.insert().execute(id=1, native_inverval=None, non_native_interval=None)
+ interval_table.insert().execute(
+ id=1,
+ native_inverval=None,
+ non_native_interval=None)
row = interval_table.select().execute().first()
eq_(row['native_interval'], None)
eq_(row['native_interval_args'], None)
class BooleanTest(
fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL):
+
"""test edge cases for booleans. Note that the main boolean test suite
is now in testing/suite/test_types.py
@classmethod
def define_tables(cls, metadata):
Table('boolean_table', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('value', Boolean),
- Column('unconstrained_value', Boolean(create_constraint=False)),
- )
-
- @testing.fails_on('mysql',
- "The CHECK clause is parsed but ignored by all storage engines.")
+ Column('id', Integer, primary_key=True, autoincrement=False),
+ Column('value', Boolean),
+ Column('unconstrained_value', Boolean(create_constraint=False)),
+ )
+
+ @testing.fails_on(
+ 'mysql',
+ "The CHECK clause is parsed but ignored by all storage engines.")
@testing.fails_on('mssql',
- "FIXME: MS-SQL 2005 doesn't honor CHECK ?!?")
+ "FIXME: MS-SQL 2005 doesn't honor CHECK ?!?")
@testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
def test_constraint(self):
assert_raises((exc.IntegrityError, exc.ProgrammingError),
- testing.db.execute,
- "insert into boolean_table (id, value) values(1, 5)")
+ testing.db.execute,
+ "insert into boolean_table (id, value) values(1, 5)")
@testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
def test_unconstrained(self):
testing.db.execute(
- "insert into boolean_table (id, unconstrained_value) values (1, 5)")
+ "insert into boolean_table "
+ "(id, unconstrained_value) values (1, 5)")
def test_non_native_constraint_custom_type(self):
class Foob(object):
m = MetaData()
t1 = Table('t', m, Column('x', MyBool()))
- const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+ const = [
+ c for c in t1.constraints if isinstance(
+ c,
+ CheckConstraint)][0]
self.assert_compile(
AddConstraint(const),
assert p1.compare_values(p1.copy_value(obj), obj)
assert_raises(NotImplementedError,
- p1.compare_values,
- pickleable.BrokenComparable('foo'),
- pickleable.BrokenComparable('foo'))
+ p1.compare_values,
+ pickleable.BrokenComparable('foo'),
+ pickleable.BrokenComparable('foo'))
def test_nonmutable_comparison(self):
p1 = PickleType()
):
assert p1.compare_values(p1.copy_value(obj), obj)
+
class CallableTest(fixtures.TestBase):
@classmethod
ucode = util.partial(Unicode)
thing_table = Table('thing', meta,
- Column('name', ucode(20))
- )
+ Column('name', ucode(20))
+ )
assert isinstance(thing_table.c.name.type, Unicode)
thing_table.create()
ucode = util.partial(Unicode)
thang_table = Table('thang', meta,
- Column('name', type_=ucode(20), primary_key=True)
- )
+ Column('name', type_=ucode(20), primary_key=True)
+ )
assert isinstance(thang_table.c.name.type, Unicode)
thang_table.create()
-