--- /dev/null
+.. change::
+ :tags: bug, oracle
+ :tickets: 4264
+ :versions: 1.3.0b1
+
+ The Oracle BINARY_FLOAT and BINARY_DOUBLE datatypes now participate within
+ cx_Oracle.setinputsizes(), passing along NATIVE_FLOAT, so as to support the
+ NaN value. Additionally, :class:`.oracle.BINARY_FLOAT`,
+ :class:`.oracle.BINARY_DOUBLE` and :class:`.oracle.DOUBLE_PRECISION` now
+ subclass :class:`.Float`, since these are floating point datatypes, not
+ decimal. These datatypes were already defaulting the
+ :paramref:`.Float.asdecimal` flag to False in line with what
+ :class:`.Float` already does.
+
+.. change::
+ :tags: bug, oracle
+ :versions: 1.3.0b1
+
+ Added reflection capabilities for the :class:`.oracle.BINARY_FLOAT`,
+ :class:`.oracle.BINARY_DOUBLE` datatypes.
+
return sqltypes.Integer
-class DOUBLE_PRECISION(sqltypes.Numeric):
+class DOUBLE_PRECISION(sqltypes.Float):
__visit_name__ = 'DOUBLE_PRECISION'
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = False
-
- super(DOUBLE_PRECISION, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
-
-class BINARY_DOUBLE(sqltypes.Numeric):
+class BINARY_DOUBLE(sqltypes.Float):
__visit_name__ = 'BINARY_DOUBLE'
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = False
-
- super(BINARY_DOUBLE, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
-
-class BINARY_FLOAT(sqltypes.Numeric):
+class BINARY_FLOAT(sqltypes.Float):
__visit_name__ = 'BINARY_FLOAT'
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = False
-
- super(BINARY_FLOAT, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
-
class BFILE(sqltypes.LargeBinary):
__visit_name__ = 'BFILE'
'FLOAT': FLOAT,
'DOUBLE PRECISION': DOUBLE_PRECISION,
'LONG': LONG,
+ 'BINARY_DOUBLE': BINARY_DOUBLE,
+ 'BINARY_FLOAT': BINARY_FLOAT
}
def visit_BINARY_FLOAT(self, type_, **kw):
return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
+ def visit_FLOAT(self, type_, **kw):
+ # don't support conversion between decimal/binary
+ # precision yet
+ kw['no_precision'] = True
+ return self._generate_numeric(type_, "FLOAT", **kw)
+
def visit_NUMBER(self, type_, **kw):
return self._generate_numeric(type_, "NUMBER", **kw)
- def _generate_numeric(self, type_, name, precision=None, scale=None, **kw):
+ def _generate_numeric(
+ self, type_, name, precision=None,
+ scale=None, no_precision=False, **kw):
if precision is None:
precision = type_.precision
if scale is None:
scale = getattr(type_, 'scale', None)
- if precision is None:
+ if no_precision or precision is None:
return name
elif scale is None:
n = "%(name)s(%(precision)s)"
coltype = INTEGER()
else:
coltype = NUMBER(precision, scale)
+ elif coltype == 'FLOAT':
+ # TODO: support "precision" here as "binary_precision"
+ coltype = FLOAT()
elif coltype in ('VARCHAR2', 'NVARCHAR2', 'CHAR'):
coltype = self.ischema_names.get(coltype)(length)
elif 'WITH TIME ZONE' in coltype:
return handler
-
class _OracleNumeric(sqltypes.Numeric):
is_number = False
return handler
+class _OracleBinaryFloat(_OracleNumeric):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NATIVE_FLOAT
+
+
+class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT):
+ pass
+
+
+class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE):
+ pass
+
+
class _OracleNUMBER(_OracleNumeric):
is_number = True
colspecs = {
sqltypes.Numeric: _OracleNumeric,
sqltypes.Float: _OracleNumeric,
+ oracle.BINARY_FLOAT: _OracleBINARY_FLOAT,
+ oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE,
sqltypes.Integer: _OracleInteger,
oracle.NUMBER: _OracleNUMBER,
cx_Oracle.NCLOB, cx_Oracle.CLOB, cx_Oracle.LOB,
cx_Oracle.NCHAR, cx_Oracle.FIXED_NCHAR,
cx_Oracle.BLOB, cx_Oracle.FIXED_CHAR, cx_Oracle.TIMESTAMP,
- _OracleInteger
+ _OracleInteger, _OracleBINARY_FLOAT, _OracleBINARY_DOUBLE
}
self._is_cx_oracle_6 = self.cx_oracle_ver >= (6, )
key_to_dbapi_type = {}
for bindparam in self.compiled.bind_names:
key = self.compiled.bind_names[bindparam]
- dialect_impl = bindparam.type.dialect_impl(self.dialect)
+ dialect_impl = bindparam.type._unwrapped_dialect_impl(self.dialect)
dialect_impl_cls = type(dialect_impl)
dbtype = dialect_impl.get_dbapi_type(self.dialect.dbapi)
if dbtype is not None and (
except KeyError:
return self._dialect_info(dialect)['impl']
+ def _unwrapped_dialect_impl(self, dialect):
+ """Return the 'unwrapped' dialect impl for this type.
+
+ For a type that applies wrapping logic (e.g. TypeDecorator), give
+ us the real, actual dialect-level type that is used.
+
+ This is used by TypeDecorator itself as well at least one case where
+ dialects need to check that a particular specific dialect-level
+ type is in use, within the :meth:`.DefaultDialect.set_input_sizes`
+ method.
+
+ """
+ return self.dialect_impl(dialect)
+
def _cached_literal_processor(self, dialect):
"""Return a dialect-specific literal processor for this type."""
try:
# otherwise adapt the impl type, link
# to a copy of this TypeDecorator and return
# that.
- typedesc = self.load_dialect_impl(dialect).dialect_impl(dialect)
+ typedesc = self._unwrapped_dialect_impl(dialect)
tt = self.copy()
if not isinstance(tt, self.__class__):
raise AssertionError('Type object %s does not properly '
"""
return self.impl
+ def _unwrapped_dialect_impl(self, dialect):
+ """Return the 'unwrapped' dialect impl for this type.
+
+ For a type that applies wrapping logic (e.g. TypeDecorator), give
+ us the real, actual dialect-level type that is used.
+
+ This is used by TypeDecorator itself as well at least one case where
+ dialects need to check that a particular specific dialect-level
+ type is in use, within the :meth:`.DefaultDialect.set_input_sizes`
+ method.
+
+ """
+ return self.load_dialect_impl(dialect).dialect_impl(dialect)
+
def __getattr__(self, key):
"""Proxy all other undefined accessors to the underlying
implementation."""
ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
PrimaryKeyConstraint, FLOAT, INTEGER
-from sqlalchemy.dialects.oracle.base import NUMBER
+from sqlalchemy.dialects.oracle.base import NUMBER, BINARY_DOUBLE, \
+ BINARY_FLOAT, DOUBLE_PRECISION
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Table, Column
(NUMBER, NUMBER(),),
]
self._run_test(specs, ['precision', 'scale'])
+
+ def test_float_types(self):
+ specs = [
+ (DOUBLE_PRECISION(), FLOAT()),
+ # when binary_precision is supported
+ # (DOUBLE_PRECISION(), oracle.FLOAT(binary_precision=126)),
+ (BINARY_DOUBLE(), BINARY_DOUBLE()),
+ (BINARY_FLOAT(), BINARY_FLOAT()),
+ (FLOAT(5), FLOAT(),),
+ # when binary_precision is supported
+ # (FLOAT(5), oracle.FLOAT(binary_precision=5),),
+ (FLOAT(), FLOAT()),
+ # when binary_precision is supported
+ # (FLOAT(5), oracle.FLOAT(binary_precision=126),),
+ ]
+ self._run_test(specs, ['precision'])
ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
PrimaryKeyConstraint, FLOAT
+from sqlalchemy.sql.sqltypes import NullType
from sqlalchemy.util import u, b
from sqlalchemy import util
from sqlalchemy.testing import assert_raises, assert_raises_message
[(decimal.Decimal("Infinity"), ), (decimal.Decimal("-Infinity"), )]
)
+ @testing.provide_metadata
+ def test_numeric_nan_float(self):
+ m = self.metadata
+ t1 = Table('t1', m,
+ Column("intcol", Integer),
+ Column("numericcol", oracle.BINARY_DOUBLE(asdecimal=False)))
+ t1.create()
+ t1.insert().execute([
+ dict(
+ intcol=1,
+ numericcol=float("nan")
+ ),
+ dict(
+ intcol=2,
+ numericcol=float("-nan")
+ ),
+ ])
+
+ eq_(
+ [
+ tuple(str(col) for col in row)
+ for row in select([t1.c.numericcol]).
+ order_by(t1.c.intcol).execute()
+ ],
+ [('nan', ), ('nan', )]
+ )
+
+ eq_(
+ [
+ tuple(str(col) for col in row)
+ for row in testing.db.execute(
+ "select numericcol from t1 order by intcol"
+ )
+ ],
+ [('nan', ), ('nan', )]
+
+ )
+
+ # needs https://github.com/oracle/python-cx_Oracle/issues/184#issuecomment-391399292
+ @testing.provide_metadata
+ def _dont_test_numeric_nan_decimal(self):
+ m = self.metadata
+ t1 = Table('t1', m,
+ Column("intcol", Integer),
+ Column("numericcol", oracle.BINARY_DOUBLE(asdecimal=True)))
+ t1.create()
+ t1.insert().execute([
+ dict(
+ intcol=1,
+ numericcol=decimal.Decimal("NaN")
+ ),
+ dict(
+ intcol=2,
+ numericcol=decimal.Decimal("-NaN")
+ ),
+ ])
+
+ eq_(
+ select([t1.c.numericcol]).
+ order_by(t1.c.intcol).execute().fetchall(),
+ [(decimal.Decimal("NaN"), ), (decimal.Decimal("NaN"), )]
+ )
+
+ eq_(
+ testing.db.execute(
+ "select numericcol from t1 order by intcol").fetchall(),
+ [(decimal.Decimal("NaN"), ), (decimal.Decimal("NaN"), )]
+ )
+
@testing.provide_metadata
def test_numerics_broken_inspection(self):
"""Numeric scenarios where Oracle type info is 'broken',
@testing.provide_metadata
def _test_setinputsizes(self, datatype, value, sis_value):
+ class TestTypeDec(TypeDecorator):
+ impl = NullType()
+
+ def load_dialect_impl(self, dialect):
+ if dialect.name == 'oracle':
+ return dialect.type_descriptor(datatype)
+ else:
+ return self.impl
+
m = self.metadata
- t1 = Table('t1', m, Column('foo', datatype))
- t1.create()
+ # Oracle can have only one column of type LONG so we make three
+ # tables rather than one table w/ three columns
+ t1 = Table(
+ 't1', m,
+ Column('foo', datatype),
+ )
+ t2 = Table(
+ 't2', m,
+ Column('foo', NullType().with_variant(datatype, "oracle")),
+ )
+ t3 = Table(
+ 't3', m,
+ Column('foo', TestTypeDec())
+ )
+ m.create_all()
class CursorWrapper(object):
# cx_oracle cursor can't be modified so we have to
with testing.db.connect() as conn:
connection_fairy = conn.connection
- with mock.patch.object(
- connection_fairy, "cursor",
- lambda: CursorWrapper(connection_fairy)
- ):
- conn.execute(
- t1.insert(), {"foo": value}
- )
-
- if sis_value:
- eq_(
- conn.info['mock'].mock_calls,
- [mock.call.setinputsizes(foo=sis_value)]
- )
- else:
- eq_(
- conn.info['mock'].mock_calls,
- [mock.call.setinputsizes()]
- )
+ for tab in [t1, t2, t3]:
+ with mock.patch.object(
+ connection_fairy, "cursor",
+ lambda: CursorWrapper(connection_fairy)
+ ):
+ conn.execute(
+ tab.insert(),
+ {"foo": value}
+ )
+
+ if sis_value:
+ eq_(
+ conn.info['mock'].mock_calls,
+ [mock.call.setinputsizes(foo=sis_value)]
+ )
+ else:
+ eq_(
+ conn.info['mock'].mock_calls,
+ [mock.call.setinputsizes()]
+ )
def test_smallint_setinputsizes(self):
self._test_setinputsizes(
def test_float_setinputsizes(self):
self._test_setinputsizes(Float(15), 25.34534, None)
+ def test_binary_double_setinputsizes(self):
+ self._test_setinputsizes(
+ oracle.BINARY_DOUBLE, 25.34534,
+ testing.db.dialect.dbapi.NATIVE_FLOAT)
+
+ def test_binary_float_setinputsizes(self):
+ self._test_setinputsizes(
+ oracle.BINARY_FLOAT, 25.34534,
+ testing.db.dialect.dbapi.NATIVE_FLOAT)
+
+ def test_double_precision_setinputsizes(self):
+ self._test_setinputsizes(
+ oracle.DOUBLE_PRECISION, 25.34534,
+ None)
+
def test_unicode(self):
self._test_setinputsizes(
Unicode(30), u("test"), testing.db.dialect.dbapi.NCHAR)