all_objects=all_objects,
)
+ def maybe_int(value):
+ if isinstance(value, float) and value.is_integer():
+ return int(value)
+ else:
+ return value
+
for row_dict in result:
table_name = self.normalize_name(row_dict["table_name"])
orig_colname = row_dict["column_name"]
colname = self.normalize_name(orig_colname)
coltype = row_dict["data_type"]
- precision = row_dict["data_precision"]
+ precision = maybe_int(row_dict["data_precision"])
if coltype == "NUMBER":
- scale = row_dict["data_scale"]
+ scale = maybe_int(row_dict["data_scale"])
if precision is None and scale == 0:
coltype = INTEGER()
else:
coltype = FLOAT(binary_precision=precision)
elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
- coltype = self.ischema_names.get(coltype)(
- row_dict["char_length"]
- )
+ char_length = maybe_int(row_dict["char_length"])
+ coltype = self.ischema_names.get(coltype)(char_length)
elif "WITH TIME ZONE" in coltype:
coltype = TIMESTAMP(timezone=True)
else:
# coding: utf-8
+from sqlalchemy import CHAR
from sqlalchemy import Double
from sqlalchemy import exc
from sqlalchemy import FLOAT
from sqlalchemy import INTEGER
from sqlalchemy import Integer
from sqlalchemy import MetaData
+from sqlalchemy import NCHAR
from sqlalchemy import Numeric
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import select
+from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import Unicode
from sqlalchemy.dialects.oracle.base import DOUBLE_PRECISION
from sqlalchemy.dialects.oracle.base import NUMBER
from sqlalchemy.dialects.oracle.base import REAL
+from sqlalchemy.dialects.oracle.types import NVARCHAR2
+from sqlalchemy.dialects.oracle.types import VARCHAR2
from sqlalchemy.engine import ObjectKind
+from sqlalchemy.sql.sqltypes import NVARCHAR
+from sqlalchemy.sql.sqltypes import VARCHAR
from sqlalchemy.testing import assert_warns
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import config
for i, (reflected_col, spec) in enumerate(zip(table.c, specs)):
expected_spec = spec[1]
reflected_type = reflected_col.type
- is_(type(reflected_type), type(expected_spec))
+ is_(type(reflected_type), type(expected_spec), spec[0])
for attr in attributes:
+ r_attr = getattr(reflected_type, attr)
+ e_attr = getattr(expected_spec, attr)
+ col = f"c{i+1}"
eq_(
- getattr(reflected_type, attr),
- getattr(expected_spec, attr),
- "Column %s: Attribute %s value of %s does not "
- "match %s for type %s"
- % (
- "c%i" % (i + 1),
- attr,
- getattr(reflected_type, attr),
- getattr(expected_spec, attr),
- spec[0],
- ),
+ r_attr,
+ e_attr,
+ f"Column {col}: Attribute {attr} value of {r_attr} "
+ f"does not match {e_attr} for type {spec[0]}",
+ )
+ eq_(
+ type(r_attr),
+ type(e_attr),
+ f"Column {col}: Attribute {attr} type do not match "
+ f"{type(r_attr)} != {type(e_attr)} for db type {spec[0]}",
)
def test_integer_types(self, metadata, connection):
]
self._run_test(metadata, connection, specs, ["precision"])
+ def test_string_types(
+ self,
+ metadata,
+ connection,
+ ):
+ specs = [
+ (String(125), VARCHAR(125)),
+ (String(42).with_variant(VARCHAR2(42), "oracle"), VARCHAR(42)),
+ (Unicode(125), VARCHAR(125)),
+ (Unicode(42).with_variant(NVARCHAR2(42), "oracle"), NVARCHAR(42)),
+ (CHAR(125), CHAR(125)),
+ (NCHAR(42), NCHAR(42)),
+ ]
+ self._run_test(metadata, connection, specs, ["length"])
+
class IdentityReflectionTest(fixtures.TablesTest):
__only_on__ = "oracle"