except SQLite and Firebird. The names are::
test_schema
- test_schema_2 (only used on PostgreSQL)
+ test_schema_2 (only used on PostgreSQL and mssql)
Please refer to your vendor documentation for the proper syntax to create
these namespaces - the database user must have permission to create and drop
--- /dev/null
+.. change::
+ :tags: schema, reflection
+ :tickets: 5063
+
+ Added support for reflection of "computed" columns, which are now returned
+ as part of the structure returned by :meth:`.Inspector.get_columns`.
+ When reflecting full :class:`.Table` objects, computed columns will
+ be represented using the :class:`.Computed` construct.
from ...engine import reflection
from ...sql import compiler
from ...sql import expression
+from ...sql import func
from ...sql import quoted_name
from ...sql import util as sql_util
from ...types import BIGINT
def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
# Get base columns
columns = ischema.columns
+ computed_cols = ischema.computed_columns
if owner:
whereclause = sql.and_(
columns.c.table_name == tablename,
columns.c.table_schema == owner,
)
+ table_fullname = "%s.%s" % (owner, tablename)
+ concat = func.concat(
+ columns.c.table_schema, ".", columns.c.table_name
+ )
+ join_on = computed_cols.c.object_id == func.object_id(concat)
else:
whereclause = columns.c.table_name == tablename
+ table_fullname = tablename
+ join_on = computed_cols.c.object_id == func.object_id(
+ columns.c.table_name
+ )
+
+ join_on = sql.and_(
+ join_on, columns.c.column_name == computed_cols.c.name
+ )
+ join = columns.join(computed_cols, onclause=join_on, isouter=True)
s = sql.select(
- [columns], whereclause, order_by=[columns.c.ordinal_position]
+ [
+ columns,
+ computed_cols.c.definition,
+ computed_cols.c.is_persisted,
+ ],
+ whereclause,
+ from_obj=join,
+ order_by=[columns.c.ordinal_position],
)
c = connection.execute(s)
cols = []
+
while True:
row = c.fetchone()
if row is None:
break
- (
- name,
- type_,
- nullable,
- charlen,
- numericprec,
- numericscale,
- default,
- collation,
- ) = (
- row[columns.c.column_name],
- row[columns.c.data_type],
- row[columns.c.is_nullable] == "YES",
- row[columns.c.character_maximum_length],
- row[columns.c.numeric_precision],
- row[columns.c.numeric_scale],
- row[columns.c.column_default],
- row[columns.c.collation_name],
- )
+ name = row[columns.c.column_name]
+ type_ = row[columns.c.data_type]
+ nullable = row[columns.c.is_nullable] == "YES"
+ charlen = row[columns.c.character_maximum_length]
+ numericprec = row[columns.c.numeric_precision]
+ numericscale = row[columns.c.numeric_scale]
+ default = row[columns.c.column_default]
+ collation = row[columns.c.collation_name]
+ definition = row[computed_cols.c.definition]
+ is_persisted = row[computed_cols.c.is_persisted]
+
coltype = self.ischema_names.get(type_, None)
kwargs = {}
"default": default,
"autoincrement": False,
}
+
+ if definition is not None and is_persisted is not None:
+ cdict["computed"] = {
+ "sqltext": definition,
+ "persisted": is_persisted,
+ }
+
cols.append(cdict)
# autoincrement and identity
colmap = {}
from ... import util
from ...ext.compiler import compiles
from ...sql import expression
+from ...types import Boolean
from ...types import Integer
from ...types import String
from ...types import TypeDecorator
Column("IS_UPDATABLE", String, key="is_updatable"),
schema="INFORMATION_SCHEMA",
)
+
+computed_columns = Table(
+ "computed_columns",
+ ischema,
+ Column("object_id", Integer),
+ Column("name", CoerceUnicode),
+ Column("is_computed", Boolean),
+ Column("is_persisted", Boolean),
+ Column("definition", CoerceUnicode),
+ schema="sys",
+)
if comment is not None:
comment = comment.replace("\\\\", "\\").replace("''", "'")
+ sqltext = spec.get("generated")
+ if sqltext is not None:
+ computed = dict(sqltext=sqltext)
+ persisted = spec.get("persistence")
+ if persisted is not None:
+ computed["persisted"] = persisted == "STORED"
+ col_kw["computed"] = computed
+
col_d = dict(
name=name, type=type_instance, default=default, comment=comment
)
r"(?:NULL|'(?:''|[^'])*'|[\w\(\)]+"
r"(?: +ON UPDATE [\w\(\)]+)?)"
r"))?"
+ r"(?: +(?:GENERATED ALWAYS)? ?AS +(?P<generated>\("
+ r".*\))? ?(?P<persistence>VIRTUAL|STORED)?)?"
r"(?: +(?P<autoincr>AUTO_INCREMENT))?"
r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?"
r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?"
text = """
SELECT col.column_name, col.data_type, col.%(char_length_col)s,
col.data_precision, col.data_scale, col.nullable,
- col.data_default, com.comments\
- FROM all_tab_columns%(dblink)s col
+ col.data_default, com.comments, col.virtual_column\
+ FROM all_tab_cols%(dblink)s col
LEFT JOIN all_col_comments%(dblink)s com
ON col.table_name = com.table_name
AND col.column_name = com.column_name
AND col.owner = com.owner
WHERE col.table_name = :table_name
+ AND col.hidden_column = 'NO'
"""
if schema is not None:
params["owner"] = schema
nullable = row[5] == "Y"
default = row[6]
comment = row[7]
+ generated = row[8]
if coltype == "NUMBER":
if precision is None and scale == 0:
)
coltype = sqltypes.NULLTYPE
+ if generated == "YES":
+ computed = dict(sqltext=default)
+ default = None
+ else:
+ computed = None
+
cdict = {
"name": colname,
"type": coltype,
}
if orig_colname.lower() == orig_colname:
cdict["quote"] = True
+ if computed is not None:
+ cdict["computed"] = computed
columns.append(cdict)
return columns
table_oid = self.get_table_oid(
connection, table_name, schema, info_cache=kw.get("info_cache")
)
- SQL_COLS = """
+
+ generated = (
+ "a.attgenerated as generated"
+ if self.server_version_info >= (12,)
+ else "NULL as generated"
+ )
+ SQL_COLS = (
+ """
SELECT a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod),
(SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
AND a.atthasdef)
AS DEFAULT,
a.attnotnull, a.attnum, a.attrelid as table_oid,
- pgd.description as comment
+ pgd.description as comment,
+ %s
FROM pg_catalog.pg_attribute a
LEFT JOIN pg_catalog.pg_description pgd ON (
pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
"""
+ % generated
+ )
s = (
sql.text(SQL_COLS)
.bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
attnum,
table_oid,
comment,
+ generated,
) in rows:
column_info = self._get_column_info(
name,
enums,
schema,
comment,
+ generated,
)
columns.append(column_info)
return columns
enums,
schema,
comment,
+ generated,
):
def _handle_array_type(attype):
return (
"Did not recognize type '%s' of column '%s'" % (attype, name)
)
coltype = sqltypes.NULLTYPE
+
+ # If a zero byte (''), then not a generated column.
+ # Otherwise, s = stored. (Other values might be added in the future.)
+ if generated:
+ computed = dict(sqltext=default, persisted=generated == "s")
+ default = None
+ else:
+ computed = None
+
# adjust the default value
autoincrement = False
if default is not None:
autoincrement=autoincrement,
comment=comment,
)
+ if computed is not None:
+ column_info["computed"] = computed
return column_info
@reflection.cache
* ``default`` - the column's server default value - this is returned
as a string SQL expression.
- * ``attrs`` - dict containing optional column attributes
+ * ``autoincrement`` - indicates that the column is auto incremented -
+ this is returned as a boolean or 'auto'
+
+ * ``comment`` - (optional) the commnet on the column. Only some
+ dialects return this key
+
+ * ``computed`` - (optional) when present it indicates that this column
+ is computed by the database. Only some dialects return this key.
+ Returned as a dict with the keys:
+
+ * ``sqltext`` - the expression used to generate this column returned
+ as a string SQL expression
+
+ * ``persisted`` - (optional) boolean that indicates if the column is
+ stored in the table
+
+ .. versionadded:: 1.3.16 - added support for computed reflection.
+
+ * ``dialect_options`` - (optional) a dict with dialect specific options
+
:param table_name: string name of the table. For special quoting,
use :class:`.quoted_name`.
colargs.append(default)
+ if "computed" in col_d:
+ computed = sa_schema.Computed(**col_d["computed"])
+ colargs.append(computed)
+
if "sequence" in col_d:
self._reflect_col_sequence(col_d, colargs)
from .assertions import in_ # noqa
from .assertions import is_ # noqa
from .assertions import is_false # noqa
+from .assertions import is_instance_of # noqa
from .assertions import is_not_ # noqa
from .assertions import is_true # noqa
from .assertions import le_ # noqa
assert a <= b, msg or "%r != %r" % (a, b)
+def is_instance_of(a, b, msg=None):
+ assert isinstance(a, b), msg or "%r is not an instance of %r" % (a, b)
+
+
def is_true(a, msg=None):
is_(a, True, msg=msg)
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+import re
import sys
import sqlalchemy as sa
if cls.metadata.tables and cls.run_create_tables:
cls.metadata.create_all(config.db)
+
+
+class ComputedReflectionFixtureTest(TablesTest):
+ run_inserts = run_deletes = None
+
+ __backend__ = True
+ __requires__ = ("computed_columns", "table_reflection")
+
+ regexp = re.compile(r"[\[\]\(\)\s`'\"]*")
+
+ def normalize(self, text):
+ return self.regexp.sub("", text).lower()
+
+ @classmethod
+ def define_tables(cls, metadata):
+ from .. import Integer
+ from .. import testing
+ from ..schema import Column
+ from ..schema import Computed
+ from ..schema import Table
+
+ Table(
+ "computed_default_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("normal", Integer),
+ Column("computed_col", Integer, Computed("normal + 42")),
+ Column("with_default", Integer, server_default="42"),
+ )
+
+ t = Table(
+ "computed_column_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("normal", Integer),
+ Column("computed_no_flag", Integer, Computed("normal + 42")),
+ )
+
+ t2 = Table(
+ "computed_column_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("normal", Integer),
+ Column("computed_no_flag", Integer, Computed("normal / 42")),
+ schema=config.test_schema,
+ )
+ if testing.requires.computed_columns_virtual.enabled:
+ t.append_column(
+ Column(
+ "computed_virtual",
+ Integer,
+ Computed("normal + 2", persisted=False),
+ )
+ )
+ t2.append_column(
+ Column(
+ "computed_virtual",
+ Integer,
+ Computed("normal / 2", persisted=False),
+ )
+ )
+ if testing.requires.computed_columns_stored.enabled:
+ t.append_column(
+ Column(
+ "computed_stored",
+ Integer,
+ Computed("normal - 42", persisted=True),
+ )
+ )
+ t2.append_column(
+ Column(
+ "computed_stored",
+ Integer,
+ Computed("normal * 42", persisted=True),
+ )
+ )
@property
def computed_columns(self):
+ "Supports computed columns"
+ return exclusions.closed()
+
+ @property
+ def computed_columns_stored(self):
+ "Supports computed columns with `persisted=True`"
+ return exclusions.closed()
+
+ @property
+ def computed_columns_virtual(self):
+ "Supports computed columns with `persisted=False`"
+ return exclusions.closed()
+
+ @property
+ def computed_columns_default_persisted(self):
+ """If the default persistence is virtual or stored when `persisted`
+ is omitted"""
+ return exclusions.closed()
+
+ @property
+ def computed_columns_reflect_persisted(self):
+ """If persistence information is returned by the reflection of
+ computed columns"""
return exclusions.closed()
eq_(tablenames[1].upper(), tablenames[1].lower())
-__all__ = ("ComponentReflectionTest", "HasTableTest", "NormalizedNameTest")
+class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
+ def test_computed_col_default_not_set(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns("computed_column_table")
+ for col in cols:
+ if col["name"] == "with_default":
+ is_true("42" in col["default"])
+ elif not col["autoincrement"]:
+ is_(col["default"], None)
+
+ def test_get_column_returns_computed(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns("computed_default_table")
+ data = {c["name"]: c for c in cols}
+ for key in ("id", "normal", "with_default"):
+ is_true("computed" not in data[key])
+ compData = data["computed_col"]
+ is_true("computed" in compData)
+ is_true("sqltext" in compData["computed"])
+ eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
+ eq_(
+ "persisted" in compData["computed"],
+ testing.requires.computed_columns_reflect_persisted.enabled,
+ )
+ if testing.requires.computed_columns_reflect_persisted.enabled:
+ eq_(
+ compData["computed"]["persisted"],
+ testing.requires.computed_columns_default_persisted.enabled,
+ )
+
+ def check_column(self, data, column, sqltext, persisted):
+ is_true("computed" in data[column])
+ compData = data[column]["computed"]
+ eq_(self.normalize(compData["sqltext"]), sqltext)
+ if testing.requires.computed_columns_reflect_persisted.enabled:
+ is_true("persisted" in compData)
+ is_(compData["persisted"], persisted)
+
+ def test_get_column_returns_persisted(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns("computed_column_table")
+ data = {c["name"]: c for c in cols}
+
+ self.check_column(
+ data,
+ "computed_no_flag",
+ "normal+42",
+ testing.requires.computed_columns_default_persisted.enabled,
+ )
+ if testing.requires.computed_columns_virtual.enabled:
+ self.check_column(
+ data, "computed_virtual", "normal+2", False,
+ )
+ if testing.requires.computed_columns_stored.enabled:
+ self.check_column(
+ data, "computed_stored", "normal-42", True,
+ )
+
+ def test_get_column_returns_persisted_with_schama(self):
+ insp = inspect(config.db)
+
+ cols = insp.get_columns(
+ "computed_column_table", schema=config.test_schema
+ )
+ data = {c["name"]: c for c in cols}
+
+ self.check_column(
+ data,
+ "computed_no_flag",
+ "normal/42",
+ testing.requires.computed_columns_default_persisted.enabled,
+ )
+ if testing.requires.computed_columns_virtual.enabled:
+ self.check_column(
+ data, "computed_virtual", "normal/2", False,
+ )
+ if testing.requires.computed_columns_stored.enabled:
+ self.check_column(
+ data, "computed_stored", "normal*42", True,
+ )
+
+
+__all__ = (
+ "ComponentReflectionTest",
+ "HasTableTest",
+ "NormalizedNameTest",
+ "ComputedReflectionTest",
+)
("my_custom_type(ARG1, ARG2)", ("ARG1", "ARG2")),
]:
column_info = dialect._get_column_info(
- "colname", sch, None, False, {}, {}, "public", None
+ "colname", sch, None, False, {}, {}, "public", None, ""
)
assert isinstance(column_info["type"], self.CustomType)
eq_(column_info["type"].arg1, args[0])
import unicodedata
import sqlalchemy as sa
+from sqlalchemy import Computed
from sqlalchemy import DefaultClause
from sqlalchemy import FetchedValue
from sqlalchemy import ForeignKey
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_instance_of
+from sqlalchemy.testing import is_not_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import not_in_
eq_(str(table.c.x.server_default.arg), "1")
self._do_test("x", {"default": my_default}, assert_text_of_one)
+
+
+class ComputedColumnTest(fixtures.ComputedReflectionFixtureTest):
+ def check_table_column(self, table, name, text, persisted):
+ is_true(name in table.columns)
+ col = table.columns[name]
+ is_not_(col.computed, None)
+ is_instance_of(col.computed, Computed)
+
+ eq_(self.normalize(str(col.computed.sqltext)), text)
+ if testing.requires.computed_columns_reflect_persisted.enabled:
+ eq_(col.computed.persisted, persisted)
+ else:
+ is_(col.computed.persisted, None)
+
+ def test_table_reflection(self):
+ meta = MetaData()
+ table = Table("computed_column_table", meta, autoload_with=config.db)
+
+ self.check_table_column(
+ table,
+ "computed_no_flag",
+ "normal+42",
+ testing.requires.computed_columns_default_persisted.enabled,
+ )
+ if testing.requires.computed_columns_virtual.enabled:
+ self.check_table_column(
+ table, "computed_virtual", "normal+2", False,
+ )
+ if testing.requires.computed_columns_stored.enabled:
+ self.check_table_column(
+ table, "computed_stored", "normal-42", True,
+ )
@property
def python_profiling_backend(self):
return only_on([self._sqlite_memory_db])
+
+ @property
+ def computed_columns_stored(self):
+ return self.computed_columns + skip_if(["oracle", "firebird"])
+
+ @property
+ def computed_columns_virtual(self):
+ return self.computed_columns + skip_if(["postgresql", "firebird"])
+
+ @property
+ def computed_columns_default_persisted(self):
+ return self.computed_columns + only_if("postgresql")
+
+ @property
+ def computed_columns_reflect_persisted(self):
+ return self.computed_columns + skip_if("oracle")