for k in table.kwargs:
if k.startswith('mysql_'):
opt = k[6:].upper()
+
+ arg = table.kwargs[k]
+ if opt in _options_of_type_string:
+ arg = "'%s'" % arg.replace("\\", "\\\\").replace("'", "''")
+
+ if opt in ('DATA_DIRECTORY', 'INDEX_DIRECTORY',
+ 'DEFAULT_CHARACTER_SET', 'CHARACTER_SET', 'DEFAULT_CHARSET',
+ 'DEFAULT_COLLATE'):
+ opt = opt.replace('_', ' ')
+
joiner = '='
if opt in ('TABLESPACE', 'DEFAULT CHARACTER SET',
'CHARACTER SET', 'COLLATE'):
joiner = ' '
- table_opts.append(joiner.join((opt, table.kwargs[k])))
+ table_opts.append(joiner.join((opt, arg)))
return ' '.join(table_opts)
def visit_drop_index(self, drop):
def _parse_constraints(self, line):
"""Parse a KEY or CONSTRAINT line.
- line
- A line of SHOW CREATE TABLE output
+ :param line: A line of SHOW CREATE TABLE output
"""
# KEY
def _parse_table_name(self, line, state):
"""Extract the table name.
- line
- The first line of SHOW CREATE TABLE
+ :param line: The first line of SHOW CREATE TABLE
"""
regex, cleanup = self._pr_name
def _parse_table_options(self, line, state):
"""Build a dictionary of all reflected table-level options.
- line
- The final line of SHOW CREATE TABLE output.
+ :param line: The final line of SHOW CREATE TABLE output.
"""
options = {}
pass
else:
- r_eq_trim = self._re_options_util['=']
-
+ rest_of_line = line[:]
for regex, cleanup in self._pr_options:
- m = regex.search(line)
+ m = regex.search(rest_of_line)
if not m:
continue
directive, value = m.group('directive'), m.group('val')
- directive = r_eq_trim.sub('', directive).lower()
if cleanup:
value = cleanup(value)
- options[directive] = value
+ options[directive.lower()] = value
+ rest_of_line = regex.sub('', rest_of_line)
- for nope in ('auto_increment', 'data_directory', 'index_directory'):
+ for nope in ('auto_increment', 'data directory', 'index directory'):
options.pop(nope, None)
for opt, val in options.items():
Falls back to a 'minimal support' variant if full parse fails.
- line
- Any column-bearing line from SHOW CREATE TABLE
+ :param line: Any column-bearing line from SHOW CREATE TABLE
"""
spec = None
reflecting views for runtime use. This method formats DDL
for columns only- keys are omitted.
- `columns` is a sequence of DESCRIBE or SHOW COLUMNS 6-tuples.
- SHOW FULL COLUMNS FROM rows must be rearranged for use with
- this function.
+ :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples.
+ SHOW FULL COLUMNS FROM rows must be rearranged for use with
+ this function.
"""
buffer = []
self._re_columns = []
self._pr_options = []
- self._re_options_util = {}
_final = self.preparer.final_quote
r'(?:SUB)?PARTITION')
# Table-level options (COLLATE, ENGINE, etc.)
+ # Do the string options first, since they have quoted strings we need to get rid of.
+ for option in _options_of_type_string:
+ self._add_option_string(option)
+
for option in ('ENGINE', 'TYPE', 'AUTO_INCREMENT',
'AVG_ROW_LENGTH', 'CHARACTER SET',
'DEFAULT CHARSET', 'CHECKSUM',
'KEY_BLOCK_SIZE'):
self._add_option_word(option)
- for option in (('COMMENT', 'DATA_DIRECTORY', 'INDEX_DIRECTORY',
- 'PASSWORD', 'CONNECTION')):
- self._add_option_string(option)
-
self._add_option_regex('UNION', r'\([^\)]+\)')
self._add_option_regex('TABLESPACE', r'.*? STORAGE DISK')
self._add_option_regex('RAID_TYPE',
r'\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+')
- self._re_options_util['='] = _re_compile(r'\s*=\s*$')
+
+ _optional_equals = r'(?:\s*(?:=\s*)|\s+)'
def _add_option_string(self, directive):
- regex = (r'(?P<directive>%s\s*(?:=\s*)?)'
- r'(?:\x27.(?P<val>.*?)\x27(?!\x27)\x27)' %
- re.escape(directive))
+ regex = (r'(?P<directive>%s)%s'
+ r"'(?P<val>(?:[^']|'')*?)'(?!')" %
+ (re.escape(directive), self._optional_equals))
self._pr_options.append(
- _pr_compile(regex, lambda v: v.replace("''", "'")))
+ _pr_compile(regex, lambda v: v.replace("\\\\","\\").replace("''", "'")))
def _add_option_word(self, directive):
- regex = (r'(?P<directive>%s\s*(?:=\s*)?)'
- r'(?P<val>\w+)' % re.escape(directive))
+ regex = (r'(?P<directive>%s)%s'
+ r'(?P<val>\w+)' %
+ (re.escape(directive), self._optional_equals))
self._pr_options.append(_pr_compile(regex))
def _add_option_regex(self, directive, regex):
- regex = (r'(?P<directive>%s\s*(?:=\s*)?)'
- r'(?P<val>%s)' % (re.escape(directive), regex))
+ regex = (r'(?P<directive>%s)%s'
+ r'(?P<val>%s)' %
+ (re.escape(directive), self._optional_equals, regex))
self._pr_options.append(_pr_compile(regex))
+_options_of_type_string = ('COMMENT', 'DATA DIRECTORY', 'INDEX DIRECTORY',
+ 'PASSWORD', 'CONNECTION')
+
log.class_logger(MySQLTableDefinitionParser)
log.class_logger(MySQLDialect)
assert str(reflected2.c.c2.server_default.arg) == "'0'"
assert str(reflected2.c.c3.server_default.arg) == "'abc'"
assert str(reflected2.c.c4.server_default.arg) == "'2009-04-05 12:00:00'"
-
+
+ def test_reflection_with_table_options(self):
+ comment = r"""Comment types type speedily ' " \ '' Fun!"""
+
+ def_table = Table('mysql_def', MetaData(testing.db),
+ Column('c1', Integer()),
+ mysql_engine='MEMORY',
+ mysql_comment=comment,
+ mysql_default_charset='utf8',
+ mysql_auto_increment='5',
+ mysql_avg_row_length='3',
+ mysql_password='secret',
+ mysql_connection='fish',
+ )
+
+ def_table.create()
+ try:
+ reflected = Table('mysql_def', MetaData(testing.db),
+ autoload=True)
+ finally:
+ def_table.drop()
+
+ assert def_table.kwargs['mysql_engine'] == 'MEMORY'
+ assert def_table.kwargs['mysql_comment'] == comment
+ assert def_table.kwargs['mysql_default_charset'] == 'utf8'
+ assert def_table.kwargs['mysql_auto_increment'] == '5'
+ assert def_table.kwargs['mysql_avg_row_length'] == '3'
+ assert def_table.kwargs['mysql_password'] == 'secret'
+ assert def_table.kwargs['mysql_connection'] == 'fish'
+
+ assert reflected.kwargs['mysql_engine'] == 'MEMORY'
+ assert reflected.kwargs['mysql_comment'] == comment
+ assert reflected.kwargs['mysql_default charset'] == 'utf8'
+ assert reflected.kwargs['mysql_avg_row_length'] == '3'
+ assert reflected.kwargs['mysql_connection'] == 'fish'
+
+ # This field doesn't seem to be returned by mysql itself.
+ #assert reflected.kwargs['mysql_password'] == 'secret'
+
+ # This is explicitly ignored when reflecting schema.
+ #assert reflected.kwargs['mysql_auto_increment'] == '5'
+
def test_reflection_on_include_columns(self):
"""Test reflection of include_columns to be sure they respect case."""