.. changelog::
:version: 1.2.0b1
+ .. change:: 1546
+ :tags: feature, sql, postgresql, mysql, oracle
+ :tickets: 1546
+
+ Added support for SQL comments on :class:`.Table` and :class:`.Column`
+ objects, via the new :paramref:`.Table.comment` and
+ :paramref:`.Column.comment` arguments. The comments are included
+ as part of DDL on table creation, either inline or via an appropriate
+ ALTER statement, and are also reflected back within table reflection,
+ as well as via the :class:`.Inspector`. Supported backends currently
+ include MySQL, Postgresql, and Oracle. Many thanks to Frazer McLean
+ for a large amount of effort on this.
+
+ .. seealso::
+
+ :ref:`change_1546`
+
.. change:: 3366
:tags: bug, orm
:tickets: 3366
New Features and Improvements - Core
====================================
+.. _change_1546:
+
+Support for table, column comments, including DDL and reflection
+----------------------------------------------------------------
+
+The Core receives support for string comments associated with tables
+and columns. These are specified via the :paramref:`.Table.comment` and
+:paramref:`.Column.comment` arguments::
+
+ Table(
+ 'my_table', metadata,
+ Column('q', Integer, comment="the Q value"),
+ comment="my Q table"
+ )
+
+Above, DDL will be rendered appropriately upon table create to associate
+the above comments with the table/ column within the schema. When
+the above table is autoloaded or inspected with :meth:`.Inspector.get_columns`,
+the comments are included. The table comment is also available independently
+using the :meth:`.Inspector.get_table_comment` method.
+
+Current backend support includes MySQL, Postgresql, and Oracle.
+
+:ticket:`1546`
+
.. _change_2694:
New "autoescape" option for startswith(), endswith()
if default is not None:
colspec.append('DEFAULT ' + default)
+ comment = column.comment
+ if comment is not None:
+ literal = self.sql_compiler.render_literal_value(
+ comment, sqltypes.String())
+ colspec.append('COMMENT ' + literal)
+
if column.table is not None \
and column is column.table._autoincrement_column and \
column.server_default is None:
if k.startswith('%s_' % self.dialect.name)
)
+ if table.comment is not None:
+ opts['COMMENT'] = table.comment
+
for opt in topological.sort([
('DEFAULT_CHARSET', 'COLLATE'),
('DEFAULT_CHARACTER_SET', 'COLLATE'),
"causes ON UPDATE/ON DELETE clauses to be ignored.")
return ""
+ def visit_set_table_comment(self, create):
+ return "ALTER TABLE %s COMMENT %s" % (
+ self.preparer.format_table(create.element),
+ self.sql_compiler.render_literal_value(
+ create.element.comment, sqltypes.String())
+ )
+
+ def visit_set_column_comment(self, create):
+ return "ALTER TABLE %s CHANGE %s %s" % (
+ self.preparer.format_table(create.element.table),
+ self.preparer.format_column(create.element),
+ self.get_column_specification(create.element)
+ )
+
class MySQLTypeCompiler(compiler.GenericTypeCompiler):
def _extend_numeric(self, type_, spec):
supports_sane_multi_rowcount = False
supports_multivalues_insert = True
+ supports_comments = True
+ inline_comments = True
default_paramstyle = 'format'
colspecs = colspecs
fkeys.append(fkey_d)
return fkeys
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None, **kw):
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
+ return {"text": parsed_state.table_options.get('mysql_comment', None)}
+
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
# eliminates the need to deal with this later.
default = None
- col_d = dict(name=name, type=type_instance, default=default)
+ comment = spec.get('comment', None)
+
+ if comment is not None:
+ comment = comment.replace("\\\\", "\\").replace("''", "'")
+
+ col_d = dict(name=name, type=type_instance, default=default,
+ comment=comment)
col_d.update(col_kw)
state.columns.append(col_d)
# COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT)
# STORAGE (DISK|MEMORY)
self._re_column = _re_compile(
- r' '
- r'%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +'
- r'(?P<coltype>\w+)'
- r'(?:\((?P<arg>(?:\d+|\d+,\d+|'
- r'(?:\x27(?:\x27\x27|[^\x27])*\x27,?)+))\))?'
- r'(?: +(?P<unsigned>UNSIGNED))?'
- r'(?: +(?P<zerofill>ZEROFILL))?'
- r'(?: +CHARACTER SET +(?P<charset>[\w_]+))?'
- r'(?: +COLLATE +(?P<collate>[\w_]+))?'
- r'(?: +(?P<notnull>(?:NOT )?NULL))?'
- r'(?: +DEFAULT +(?P<default>'
- r'(?:NULL|\x27(?:\x27\x27|[^\x27])*\x27|\w+'
- r'(?: +ON UPDATE \w+)?)'
- r'))?'
- r'(?: +(?P<autoincr>AUTO_INCREMENT))?'
- r'(?: +COMMENT +(P<comment>(?:\x27\x27|[^\x27])+))?'
- r'(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?'
- r'(?: +STORAGE +(?P<storage>\w+))?'
- r'(?: +(?P<extra>.*))?'
- r',?$'
+ r" "
+ r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
+ r"(?P<coltype>\w+)"
+ r"(?:\((?P<arg>(?:\d+|\d+,\d+|"
+ r"(?:'(?:''|[^'])*',?)+))\))?"
+ r"(?: +(?P<unsigned>UNSIGNED))?"
+ r"(?: +(?P<zerofill>ZEROFILL))?"
+ r"(?: +CHARACTER SET +(?P<charset>[\w_]+))?"
+ r"(?: +COLLATE +(?P<collate>[\w_]+))?"
+ r"(?: +(?P<notnull>(?:NOT )?NULL))?"
+ r"(?: +DEFAULT +(?P<default>"
+ r"(?:NULL|'(?:''|[^'])*'|\w+"
+ r"(?: +ON UPDATE \w+)?)"
+ r"))?"
+ r"(?: +(?P<autoincr>AUTO_INCREMENT))?"
+ r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?"
+ r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?"
+ r"(?: +STORAGE +(?P<storage>\w+))?"
+ r"(?: +(?P<extra>.*))?"
+ r",?$"
% quotes
)
ischema_names = ischema_names
requires_name_normalize = True
+ supports_comments = True
supports_default_values = False
supports_empty_insert = False
char_length_col = 'data_length'
params = {"table_name": table_name}
- text = "SELECT column_name, data_type, %(char_length_col)s, "\
- "data_precision, data_scale, "\
- "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
- "WHERE table_name = :table_name"
+ text = """
+ SELECT col.column_name, col.data_type, col.%(char_length_col)s,
+ col.data_precision, col.data_scale, col.nullable,
+ col.data_default, com.comments\
+ FROM all_tab_columns%(dblink)s col
+ LEFT JOIN all_col_comments%(dblink)s com
+ ON col.table_name = com.table_name
+ AND col.column_name = com.column_name
+ AND col.owner = com.owner
+ WHERE col.table_name = :table_name
+ """
if schema is not None:
params['owner'] = schema
- text += " AND owner = :owner "
- text += " ORDER BY column_id"
+ text += " AND col.owner = :owner "
+ text += " ORDER BY col.column_id"
text = text % {'dblink': dblink, 'char_length_col': char_length_col}
c = connection.execute(sql.text(text), **params)
for row in c:
- (colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
- (self.normalize_name(row[0]), row[0], row[1], row[
- 2], row[3], row[4], row[5] == 'Y', row[6])
+ colname = self.normalize_name(row[0])
+ orig_colname = row[0]
+ coltype = row[1]
+ length = row[2]
+ precision = row[3]
+ scale = row[4]
+ nullable = row[5] == 'Y'
+ default = row[6]
+ comment = row[7]
if coltype == 'NUMBER':
coltype = NUMBER(precision, scale)
'nullable': nullable,
'default': default,
'autoincrement': 'auto',
+ 'comment': comment,
}
if orig_colname.lower() == orig_colname:
cdict['quote'] = True
columns.append(cdict)
return columns
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None,
+ resolve_synonyms=False, dblink='', **kw):
+
+ info_cache = kw.get('info_cache')
+ (table_name, schema, dblink, synonym) = \
+ self._prepare_reflection_args(connection, table_name, schema,
+ resolve_synonyms, dblink,
+ info_cache=info_cache)
+
+ COMMENT_SQL = """
+ SELECT comments
+ FROM user_tab_comments
+ WHERE table_name = :table_name
+ """
+
+ c = connection.execute(sql.text(COMMENT_SQL), table_name=table_name)
+ return {"text": c.scalar()}
+
@reflection.cache
def get_indexes(self, connection, table_name, schema=None,
resolve_synonyms=False, dblink='', **kw):
preexecute_autoincrement_sequences = True
postfetch_lastrowid = False
+ supports_comments = True
supports_default_values = True
supports_empty_insert = False
supports_multivalues_insert = True
WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
AND a.atthasdef)
AS DEFAULT,
- a.attnotnull, a.attnum, a.attrelid as table_oid
+ a.attnotnull, a.attnum, a.attrelid as table_oid,
+ pgd.description as comment
FROM pg_catalog.pg_attribute a
+ LEFT JOIN pg_catalog.pg_description pgd ON (
+ pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
WHERE a.attrelid = :table_oid
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum
# format columns
columns = []
- for name, format_type, default, notnull, attnum, table_oid in rows:
+ for name, format_type, default, notnull, attnum, table_oid, \
+ comment in rows:
column_info = self._get_column_info(
- name, format_type, default, notnull, domains, enums, schema)
+ name, format_type, default, notnull, domains, enums,
+ schema, comment)
columns.append(column_info)
return columns
def _get_column_info(self, name, format_type, default,
- notnull, domains, enums, schema):
+ notnull, domains, enums, schema, comment):
# strip (*) from character varying(5), timestamp(5)
# with time zone, geometry(POLYGON), etc.
attype = re.sub(r'\(.*\)', '', format_type)
match.group(2) + match.group(3)
column_info = dict(name=name, type=coltype, nullable=nullable,
- default=default, autoincrement=autoincrement)
+ default=default, autoincrement=autoincrement,
+ comment=comment)
return column_info
@reflection.cache
for name, uc in uniques.items()
]
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None, **kw):
+ table_oid = self.get_table_oid(connection, table_name, schema,
+ info_cache=kw.get('info_cache'))
+
+ COMMENT_SQL = """
+ SELECT
+ pgd.description as table_comment
+ FROM
+ pg_catalog.pg_description pgd
+ WHERE
+ pgd.objsubid = 0 AND
+ pgd.objoid = :table_oid
+ """
+
+ c = connection.execute(sql.text(COMMENT_SQL), table_oid=table_oid)
+ return {"text": c.scalar()}
+
@reflection.cache
def get_check_constraints(
self, connection, table_name, schema=None, **kw):
type_compiler = compiler.GenericTypeCompiler
preparer = compiler.IdentifierPreparer
supports_alter = True
+ supports_comments = False
+ inline_comments = False
# the first value we'd get for an autoincrement
# column.
raise NotImplementedError()
+ def get_table_comment(
+ self, connection, table_name, schema=None, **kw):
+ r"""Return the "comment" for the table identified by `table_name`.
+
+ Given a string `table_name` and an optional string `schema`, return
+ table comment information as a dictionary with this key:
+
+ text
+ text of the comment
+
+ Raises ``NotImplementedError`` for dialects that don't support
+ comments.
+
+ .. versionadded:: 1.2
+
+ """
+
+ raise NotImplementedError()
+
def normalize_name(self, name):
"""convert the given name to lowercase if it is detected as
case insensitive.
return self.dialect.get_unique_constraints(
self.bind, table_name, schema, info_cache=self.info_cache, **kw)
+ def get_table_comment(self, table_name, schema=None, **kw):
+ """Return information about the table comment for ``table_name``.
+
+ Given a string ``table_name`` and an optional string ``schema``,
+ return table comment information as a dictionary with these keys:
+
+ text
+ text of the comment.
+
+ Raises ``NotImplementedError`` for a dialect that does not support
+ comments.
+
+ .. versionadded:: 1.2
+
+ """
+
+ return self.dialect.get_table_comment(
+ self.bind, table_name, schema, info_cache=self.info_cache,
+ **kw)
+
def get_check_constraints(self, table_name, schema=None, **kw):
"""Return information about check constraints in `table_name`.
table_name, schema, table, cols_by_orig_name,
include_columns, exclude_columns, reflection_options)
+ self._reflect_table_comment(
+ table_name, schema, table, reflection_options
+ )
+
def _reflect_column(
self, table, col_d, include_columns,
exclude_columns, cols_by_orig_name):
col_kw = dict(
(k, col_d[k])
- for k in ['nullable', 'autoincrement', 'quote', 'info', 'key']
+ for k in ['nullable', 'autoincrement', 'quote', 'info', 'key', 'comment']
if k in col_d
)
for const_d in constraints:
table.append_constraint(
sa_schema.CheckConstraint(**const_d))
+
+ def _reflect_table_comment(
+ self, table_name, schema, table, reflection_options):
+ try:
+ comment_dict = self.get_table_comment(table_name, schema)
+ except NotImplementedError:
+ return
+ else:
+ table.comment = comment_dict.get('text', None)
_CreateDropBase,
_DDLCompiles,
sort_tables,
- sort_tables_and_constraints
+ sort_tables_and_constraints,
+ SetTableComment,
+ DropTableComment,
+ SetColumnComment,
+ DropColumnComment,
)
self.process(create.element)
)
+ def visit_set_table_comment(self, create):
+ return "COMMENT ON TABLE %s IS %s" % (
+ self.preparer.format_table(create.element),
+ self.sql_compiler.render_literal_value(
+ create.element.comment, sqltypes.String())
+ )
+
+ def visit_drop_table_comment(self, drop):
+ return "COMMENT ON TABLE %s IS NULL" % \
+ self.preparer.format_table(drop.element)
+
+ def visit_set_column_comment(self, create):
+ return "COMMENT ON COLUMN %s IS %s" % (
+ self.preparer.format_column(
+ create.element, use_table=True, use_schema=True),
+ self.sql_compiler.render_literal_value(
+ create.element.comment, sqltypes.String())
+ )
+
+ def visit_drop_column_comment(self, drop):
+ return "COMMENT ON COLUMN %s IS NULL" % \
+ self.preparer.format_column(drop.element, use_table=True)
+
def visit_create_sequence(self, create):
text = "CREATE SEQUENCE %s" % \
self.preparer.format_sequence(create.element)
return self.quote(name, quote)
def format_column(self, column, use_table=False,
- name=None, table_name=None):
+ name=None, table_name=None, use_schema=False):
"""Prepare a quoted column name."""
if name is None:
if not getattr(column, 'is_literal', False):
if use_table:
return self.format_table(
- column.table, use_schema=False,
+ column.table, use_schema=use_schema,
name=table_name) + "." + self.quote(name)
else:
return self.quote(name)
if use_table:
return self.format_table(
- column.table, use_schema=False,
+ column.table, use_schema=use_schema,
name=table_name) + '.' + name
else:
return name
self._create_rule_disable)
+class SetTableComment(_CreateDropBase):
+ """Represent a COMMENT ON TABLE IS statement."""
+
+ __visit_name__ = "set_table_comment"
+
+
+class DropTableComment(_CreateDropBase):
+ """Represent a COMMENT ON TABLE IS NULL statement."""
+
+ __visit_name__ = "drop_table_comment"
+
+
+class SetColumnComment(_CreateDropBase):
+ """Represent a COMMENT ON COLUMN IS statement."""
+
+ __visit_name__ = "set_column_comment"
+
+
+class DropColumnComment(_CreateDropBase):
+ """Represent a COMMENT ON COLUMN IS NULL statement."""
+
+ __visit_name__ = "drop_column_comment"
+
+
class DDLBase(SchemaVisitor):
def __init__(self, connection):
self.connection = connection
for index in table.indexes:
self.traverse_single(index)
+ if self.dialect.supports_comments and not self.dialect.inline_comments:
+ if table.comment is not None:
+ self.connection.execute(SetTableComment(table))
+
+ for column in table.columns:
+ if column.comment is not None:
+ self.connection.execute(SetColumnComment(column))
+
table.dispatch.after_create(
table, self.connection,
checkfirst=self.checkfirst,
:param useexisting: Deprecated. Use :paramref:`.Table.extend_existing`.
+ :param comment: Optional string that will render an SQL comment on table
+ creation.
+
+ .. versionadded:: 1.2 Added the :paramref:`.Table.comment` parameter
+ to :class:`.Table`.
+
:param \**kw: Additional keyword arguments not mentioned above are
dialect specific, and passed in the form ``<dialectname>_<argname>``.
See the documentation regarding an individual dialect at
self.implicit_returning = kwargs.pop('implicit_returning', True)
+ self.comment = kwargs.pop('comment', None)
+
if 'info' in kwargs:
self.info = kwargs.pop('info')
if 'listeners' in kwargs:
if 'info' in kwargs:
self.info = kwargs.pop('info')
+ self.comment = kwargs.pop('comment', None)
+
if autoload:
if not autoload_replace:
# don't replace columns already present.
:ref:`metadata_defaults_toplevel`
:param doc: optional String that can be used by the ORM or similar
- to document attributes. This attribute does not render SQL
- comments (a future attribute 'comment' will achieve that).
+ to document attributes on the Python side. This attribute does
+ **not** render SQL comments; use the :paramref:`.Column.comment`
+ parameter for this purpose.
:param key: An optional string identifier which will identify this
``Column`` object on the :class:`.Table`. When a key is provided,
.. versionadded:: 0.8.3 Added the ``system=True`` parameter to
:class:`.Column`.
+ :param comment: Optional string that will render an SQL comment on
+ table creation.
+
+ .. versionadded:: 1.2 Added the :paramref:`.Column.comment`
+ parameter to :class:`.Column`.
+
+
"""
name = kwargs.pop('name', None)
self.autoincrement = kwargs.pop('autoincrement', "auto")
self.constraints = set()
self.foreign_keys = set()
+ self.comment = kwargs.pop('comment', None)
# check if this Column is proxying another column
if '_proxies' in kwargs:
% self.__class__.__name__)
+
@inspection._self_inspects
class FetchedValue(_NotAColumnExpr, SchemaEventTarget):
"""A marker for a transparent database-side default.
def table_reflection(self):
return exclusions.open()
+ @property
+ def comment_reflection(self):
+ return exclusions.closed()
+
@property
def view_column_reflection(self):
"""target database must support retrieval of the columns in a view,
schema=schema,
test_needs_fk=True,
)
+ Table('comment_test', metadata,
+ Column('id', sa.Integer, primary_key=True, comment='id comment'),
+ Column('data', sa.String(20), comment='data comment'),
+ schema=schema,
+ comment='the test table comment')
if testing.requires.index_reflection.enabled:
cls.define_index(metadata, users)
answer = ['email_addresses_v', 'users_v']
eq_(sorted(table_names), answer)
else:
- table_names = insp.get_table_names(schema,
- order_by=order_by)
+ table_names = [
+ t for t in insp.get_table_names(
+ schema,
+ order_by=order_by) if t not in ('comment_test', )]
+
if order_by == 'foreign_key':
answer = ['users', 'email_addresses', 'dingalings']
eq_(table_names, answer)
def test_get_table_names_fks(self):
self._test_get_table_names(order_by='foreign_key')
+ @testing.requires.comment_reflection
+ def test_get_comments(self):
+ self._test_get_comments()
+
+ @testing.requires.comment_reflection
+ @testing.requires.schemas
+ def test_get_comments_with_schema(self):
+ self._test_get_comments(testing.config.test_schema)
+
+ def _test_get_comments(self, schema=None):
+ insp = inspect(testing.db)
+
+ eq_(
+ insp.get_table_comment("comment_test", schema=schema),
+ {"text": "the test table comment"}
+ )
+
+ eq_(
+ insp.get_table_comment("users", schema=schema),
+ {"text": None}
+ )
+
+ eq_(
+ [
+ {"name": rec['name'], "comment": rec['comment']}
+ for rec in
+ insp.get_columns("comment_test", schema=schema)
+ ],
+ [
+ {'comment': 'id comment', 'name': 'id'},
+ {'comment': 'data comment', 'name': 'data'}
+ ]
+ )
+
@testing.requires.table_reflection
@testing.requires.schemas
def test_get_table_names_with_schema(self):
]:
column_info = dialect._get_column_info(
'colname', sch, None, False,
- {}, {}, 'public')
+ {}, {}, 'public', None)
assert isinstance(column_info['type'], self.CustomType)
eq_(column_info['type'].arg1, args[0])
eq_(column_info['type'].arg2, args[1])
assert set([t2.c.name, t2.c.id]) == set(r2.columns)
assert set([t2.c.name]) == set(r3.columns)
+ @testing.requires.comment_reflection
+ @testing.provide_metadata
+ def test_comment_reflection(self):
+ m1 = self.metadata
+ Table('sometable', m1,
+ Column('id', sa.Integer, comment='c1 comment'),
+ comment='t1 comment')
+ m1.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('sometable', m2, autoload=True)
+
+ eq_(t2.comment, 't1 comment')
+ eq_(t2.c.id.comment, 'c1 comment')
+
@testing.requires.check_constraint_reflection
@testing.provide_metadata
def test_check_constraint_reflection(self):
def foreign_key_constraint_option_reflection(self):
return only_on(['postgresql', 'mysql', 'sqlite'])
+ @property
+ def comment_reflection(self):
+ return only_on(['postgresql', 'mysql', 'oracle'])
+
@property
def unbounded_varchar(self):
"""Target database must support VARCHAR with no length"""
return Mock(dialect=Mock(
supports_sequences=True,
has_table=Mock(side_effect=has_item),
- has_sequence=Mock(side_effect=has_item)
+ has_sequence=Mock(side_effect=has_item),
+ supports_comments=True,
+ inline_comments=False,
)
)
return m, t1, t2, s1, s2
+ def _table_comment_fixture(self):
+ m = MetaData()
+
+ c1 = Column('id', Integer, comment='c1')
+
+ t1 = Table(
+ 't1', m, c1,
+ comment='t1'
+ )
+
+ return m, t1, c1
+
+ def test_comment(self):
+ m, t1, c1 = self._table_comment_fixture()
+
+ generator = self._mock_create_fixture(
+ False, [t1], item_exists=lambda t: t not in ("t1",))
+
+ self._assert_create_comment([t1, t1, c1], generator, m)
+
def test_create_seq_checkfirst(self):
m, t1, t2, s1, s2 = self._table_seq_fixture()
generator = self._mock_create_fixture(
(schema.DropTable, schema.DropSequence, schema.DropConstraint),
elements, generator, argument)
+ def _assert_create_comment(self, elements, generator, argument):
+ self._assert_ddl(
+ (schema.CreateTable, schema.SetTableComment, schema.SetColumnComment),
+ elements, generator, argument)
+
def _assert_ddl(self, ddl_cls, elements, generator, argument):
generator.traverse_single(argument)
for call_ in generator.connection.execute.mock_calls: