pass
elif line.startswith("CREATE "):
self._parse_table_name(line, state)
+ elif "PARTITION" in line:
+ self._parse_partition_options(line, state)
# Not present in real reflection, but may be if
# loading from a file.
elif not line:
for opt, val in options.items():
state.table_options["%s_%s" % (self.dialect.name, opt)] = val
+ def _parse_partition_options(self, line, state):
+ options = {}
+ new_line = line[:]
+
+ while new_line.startswith("(") or new_line.startswith(" "):
+ new_line = new_line[1:]
+
+ for regex, cleanup in self._pr_options:
+ m = regex.search(new_line)
+ if not m or "PARTITION" not in regex.pattern:
+ continue
+
+ directive = m.group("directive")
+ directive = directive.lower()
+ is_subpartition = directive == "subpartition"
+
+ if directive == "partition" or is_subpartition:
+ new_line = new_line.replace(") */", "")
+ new_line = new_line.replace(",", "")
+ if is_subpartition and new_line.endswith(")"):
+ new_line = new_line[:-1]
+ if self.dialect.name == "mariadb" and new_line.endswith(")"):
+ if (
+ "MAXVALUE" in new_line
+ or "MINVALUE" in new_line
+ or "ENGINE" in new_line
+ ):
+ # final line of MariaDB partition endswith ")"
+ new_line = new_line[:-1]
+
+ defs = "%s_%s_definitions" % (self.dialect.name, directive)
+ options[defs] = new_line
+
+ else:
+ directive = directive.replace(" ", "_")
+ value = m.group("val")
+ if cleanup:
+ value = cleanup(value)
+ options[directive] = value
+ break
+
+ for opt, val in options.items():
+ part_def = "%s_partition_definitions" % (self.dialect.name)
+ subpart_def = "%s_subpartition_definitions" % (self.dialect.name)
+ if opt == part_def or opt == subpart_def:
+ # builds a string of definitions
+ if opt not in state.table_options:
+ state.table_options[opt] = val
+ else:
+ state.table_options[opt] = "%s, %s" % (
+ state.table_options[opt],
+ val,
+ )
+ else:
+ state.table_options["%s_%s" % (self.dialect.name, opt)] = val
+
def _parse_column(self, line, state):
"""Extract column details.
"PACK_KEYS",
"ROW_FORMAT",
"KEY_BLOCK_SIZE",
+ "STATS_SAMPLE_PAGES",
):
self._add_option_word(option)
+ for option in (
+ "PARTITION BY",
+ "SUBPARTITION BY",
+ "PARTITIONS",
+ "SUBPARTITIONS",
+ "PARTITION",
+ "SUBPARTITION",
+ ):
+ self._add_partition_option_word(option)
+
self._add_option_regex("UNION", r"\([^\)]+\)")
self._add_option_regex("TABLESPACE", r".*? STORAGE DISK")
self._add_option_regex(
)
self._pr_options.append(_pr_compile(regex))
+ def _add_partition_option_word(self, directive):
+ if directive == "PARTITION BY" or directive == "SUBPARTITION BY":
+ regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % (
+ re.escape(directive),
+ self._optional_equals,
+ )
+ elif directive == "SUBPARTITIONS" or directive == "PARTITIONS":
+ regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % (
+ re.escape(directive),
+ self._optional_equals,
+ )
+ else:
+ regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),)
+ self._pr_options.append(_pr_compile(regex))
+
def _add_option_regex(self, directive, regex):
regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % (
re.escape(directive),
mysql_avg_row_length="3",
mysql_password="secret",
mysql_connection="fish",
+ mysql_stats_sample_pages="4",
)
def_table = Table(
assert def_table.kwargs["mysql_avg_row_length"] == "3"
assert def_table.kwargs["mysql_password"] == "secret"
assert def_table.kwargs["mysql_connection"] == "fish"
+ assert def_table.kwargs["mysql_stats_sample_pages"] == "4"
assert reflected.kwargs["mysql_engine"] == "MEMORY"
)
assert reflected.kwargs["mysql_avg_row_length"] == "3"
assert reflected.kwargs["mysql_connection"] == "fish"
+ assert reflected.kwargs["mysql_stats_sample_pages"] == "4"
# This field doesn't seem to be returned by mysql itself.
# assert reflected.kwargs['mysql_password'] == 'secret'
# This is explicitly ignored when reflecting schema.
# assert reflected.kwargs['mysql_auto_increment'] == '5'
+ def _norm_str(self, value, is_definition=False):
+ if is_definition:
+ # partition names on MariaDB contain backticks
+ return value.replace("`", "")
+ else:
+ return value.replace("`", "").replace(" ", "").lower()
+
+ def _norm_reflected_table(self, dialect_name, kwords):
+ dialect_name += "_"
+ normalised_table = {}
+ for k, v in kwords.items():
+ option = k.replace(dialect_name, "")
+ normalised_table[option] = self._norm_str(v)
+ return normalised_table
+
+ def _get_dialect_key(self):
+ if testing.against("mariadb"):
+ return "mariadb"
+ else:
+ return "mysql"
+
+ def test_reflection_with_partition_options(self, metadata, connection):
+ base_kwargs = dict(
+ engine="InnoDB",
+ default_charset="utf8",
+ partition_by="HASH(MONTH(c2))",
+ partitions="6",
+ )
+ dk = self._get_dialect_key()
+
+ kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()}
+
+ def_table = Table(
+ "mysql_def",
+ metadata,
+ Column("c1", Integer()),
+ Column("c2", DateTime),
+ **kwargs,
+ )
+ eq_(def_table.kwargs[f"{dk}_partition_by"], "HASH(MONTH(c2))")
+ eq_(def_table.kwargs[f"{dk}_partitions"], "6")
+
+ metadata.create_all(connection)
+ reflected = Table("mysql_def", MetaData(), autoload_with=connection)
+ ref_kw = self._norm_reflected_table(dk, reflected.kwargs)
+ eq_(ref_kw["partition_by"], "hash(month(c2))")
+ eq_(ref_kw["partitions"], "6")
+
+ def test_reflection_with_subpartition_options(self, connection, metadata):
+
+ subpartititon_text = """HASH (TO_DAYS (c2))
+ SUBPARTITIONS 2(
+ PARTITION p0 VALUES LESS THAN (1990),
+ PARTITION p1 VALUES LESS THAN (2000),
+ PARTITION p2 VALUES LESS THAN MAXVALUE
+ );"""
+
+ base_kwargs = dict(
+ engine="InnoDB",
+ default_charset="utf8",
+ partition_by="RANGE(YEAR(c2))",
+ subpartition_by=subpartititon_text,
+ )
+ dk = self._get_dialect_key()
+ kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()}
+
+ def_table = Table(
+ "mysql_def",
+ metadata,
+ Column("c1", Integer()),
+ Column("c2", DateTime),
+ **kwargs,
+ )
+
+ eq_(def_table.kwargs[f"{dk}_partition_by"], "RANGE(YEAR(c2))")
+ metadata.create_all(connection)
+
+ reflected = Table("mysql_def", MetaData(), autoload_with=connection)
+ ref_kw = self._norm_reflected_table(dk, reflected.kwargs)
+ opts = reflected.dialect_options[dk]
+
+ eq_(ref_kw["partition_by"], "range(year(c2))")
+ eq_(ref_kw["subpartition_by"], "hash(to_days(c2))")
+ eq_(ref_kw["subpartitions"], "2")
+ part_definitions = (
+ "PARTITION p0 VALUES LESS THAN (1990) ENGINE = InnoDB,"
+ " PARTITION p1 VALUES LESS THAN (2000) ENGINE = InnoDB,"
+ " PARTITION p2 VALUES LESS THAN MAXVALUE ENGINE = InnoDB"
+ )
+ eq_(
+ self._norm_str(opts["partition_definitions"], True),
+ part_definitions,
+ )
+
+ def test_reflection_with_subpartition_options_two(
+ self, connection, metadata
+ ):
+ partititon_text = """RANGE (YEAR (c2))
+ SUBPARTITION BY HASH( TO_DAYS(c2))(
+ PARTITION p0 VALUES LESS THAN (1990)(
+ SUBPARTITION s0,
+ SUBPARTITION s1
+ ),
+ PARTITION p1 VALUES LESS THAN (2000)(
+ SUBPARTITION s2,
+ SUBPARTITION s3
+ ),
+ PARTITION p2 VALUES LESS THAN MAXVALUE(
+ SUBPARTITION s4,
+ SUBPARTITION s5
+ )
+ );"""
+
+ base_kwargs = dict(
+ engine="InnoDB",
+ default_charset="utf8",
+ partition_by=partititon_text,
+ )
+ dk = self._get_dialect_key()
+ kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()}
+
+ def_table = Table(
+ "mysql_def",
+ metadata,
+ Column("c1", Integer()),
+ Column("c2", DateTime),
+ **kwargs,
+ )
+ eq_(def_table.kwargs[f"{dk}_partition_by"], partititon_text)
+
+ metadata.create_all(connection)
+ reflected = Table("mysql_def", MetaData(), autoload_with=connection)
+
+ ref_kw = self._norm_reflected_table(dk, reflected.kwargs)
+ opts = reflected.dialect_options[dk]
+ eq_(ref_kw["partition_by"], "range(year(c2))")
+ eq_(ref_kw["subpartition_by"], "hash(to_days(c2))")
+
+ part_definitions = (
+ "PARTITION p0 VALUES LESS THAN (1990),"
+ " PARTITION p1 VALUES LESS THAN (2000),"
+ " PARTITION p2 VALUES LESS THAN MAXVALUE"
+ )
+ subpart_definitions = (
+ "SUBPARTITION s0 ENGINE = InnoDB,"
+ " SUBPARTITION s1 ENGINE = InnoDB,"
+ " SUBPARTITION s2 ENGINE = InnoDB,"
+ " SUBPARTITION s3 ENGINE = InnoDB,"
+ " SUBPARTITION s4 ENGINE = InnoDB,"
+ " SUBPARTITION s5 ENGINE = InnoDB"
+ )
+
+ eq_(
+ self._norm_str(opts["partition_definitions"], True),
+ part_definitions,
+ )
+ eq_(
+ self._norm_str(opts["subpartition_definitions"], True),
+ subpart_definitions,
+ )
+
def test_reflection_on_include_columns(self, metadata, connection):
"""Test reflection of include_columns to be sure they respect case."""