_impls = {}
-
-Params = namedtuple("Params", ["type", "args", "kwargs"])
+Params = namedtuple("Params", ["token0", "tokens", "args", "kwargs"])
class DefaultImpl(with_metaclass(ImplMeta)):
transactional_ddl = False
command_terminator = ";"
type_synonyms = ({"NUMERIC", "DECIMAL"},)
+ type_arg_extract = ()
def __init__(
self,
# TIMESTAMP WITH TIMEZONE
# INTEGER UNSIGNED
# INTEGER (10) UNSIGNED
+ # INTEGER(10) UNSIGNED
+ # varchar character set utf8
#
- # "ext" are the words after the parenthesis, if any, but if there
- # are no parenthesis, then these are part of "col". so they are
- # moved together for normalization purposes
- matches = re.search(
- r"^(?P<col>[^()]*)(?:\((?P<params>.*?)\))?(?P<ext>[^()]*)?$",
- definition,
- ).groupdict(default="")
- col_type = matches["col"]
- if matches["ext"]:
- col_type = col_type.strip() + " " + matches["ext"].strip()
-
- params = Params(col_type, [], {})
- for term in re.findall("[^(),]+", matches["params"] or ""):
- if "=" in term:
- key, val = term.split("=")
- params.kwargs[key.strip()] = val.strip()
+
+ tokens = re.findall(r"[\w\-_]+|\(.+?\)", definition)
+
+ term_tokens = []
+ paren_term = None
+
+ for token in tokens:
+ if re.match(r"^\(.*\)$", token):
+ paren_term = token
else:
- params.args.append(term.strip())
+ term_tokens.append(token)
+
+ params = Params(term_tokens[0], term_tokens[1:], [], {})
+
+ if paren_term:
+ for term in re.findall("[^(),]+", paren_term):
+ if "=" in term:
+ key, val = term.split("=")
+ params.kwargs[key.strip()] = val.strip()
+ else:
+ params.args.append(term.strip())
+
return params
- def _column_types_match(self, inspector_type, metadata_type):
- if inspector_type == metadata_type:
+ def _column_types_match(self, inspector_params, metadata_params):
+ if inspector_params.token0 == metadata_params.token0:
return True
+
synonyms = [{t.lower() for t in batch} for batch in self.type_synonyms]
+ inspector_all_terms = " ".join(
+ [inspector_params.token0] + inspector_params.tokens
+ )
+ metadata_all_terms = " ".join(
+ [metadata_params.token0] + metadata_params.tokens
+ )
+
for batch in synonyms:
- if {inspector_type, metadata_type}.issubset(batch):
+ if {inspector_all_terms, metadata_all_terms}.issubset(batch) or {
+ inspector_params.token0,
+ metadata_params.token0,
+ }.issubset(batch):
return True
return False
we want to make sure they are the same. However, if only one
specifies it, dont flag it for being less specific
"""
- # py27- .keys is a set in py36
- for kw in set(inspected_params.kwargs.keys()) & set(
- meta_params.kwargs.keys()
+
+ if (
+ len(meta_params.tokens) == len(inspected_params.tokens)
+ and meta_params.tokens != inspected_params.tokens
):
- if inspected_params.kwargs[kw] != meta_params.kwargs[kw]:
- return False
+ return False
+
+ if (
+ len(meta_params.args) == len(inspected_params.args)
+ and meta_params.args != inspected_params.args
+ ):
+ return False
+
+ insp = " ".join(inspected_params.tokens).lower()
+ meta = " ".join(meta_params.tokens).lower()
+
+ for reg in self.type_arg_extract:
+ mi = re.search(reg, insp)
+ mm = re.search(reg, meta)
- # compare the positional arguments to the extent that they are
- # present in the SQL form of the metadata type compared to
- # the inspected type. Example: Oracle NUMBER(17) and NUMBER(17, 0)
- # are equivalent. As it's not clear if some database types might
- # ignore additional parameters vs. they aren't actually there and
- # need to be added, for now we are still favoring avoiding false
- # positives
- for meta, inspect in zip(meta_params.args, inspected_params.args):
- if meta != inspect:
+ if mi and mm and mi.group(1) != mm.group(1):
return False
return True
"""
inspector_params = self._tokenize_column_type(inspector_column)
metadata_params = self._tokenize_column_type(metadata_column)
- if not self._column_types_match(
- inspector_params.type, metadata_params.type
- ):
+
+ if not self._column_types_match(inspector_params, metadata_params,):
return True
if not self._column_args_match(inspector_params, metadata_params):
return True
# that have not changed.
is_(self._compare_columns(cola, cola), False)
+ # TODO: ideally the backend-specific types would be tested
+ # within the test suites for those backends.
@testing.combinations(
(String(32), VARCHAR(32), False),
(VARCHAR(6), String(6), False),
(Unicode(32), VARCHAR(32), False, config.requirements.unicode_string),
(VARCHAR(6), VARCHAR(12), True),
(VARCHAR(6), String(12), True),
+ (Integer(), String(10), True),
+ (String(10), Integer(), True),
+ (
+ Unicode(30, collation="en_US"),
+ Unicode(30, collation="en_US"),
+ False, # unfortunately dialects don't seem to consistently
+ # reflect collations right now so we can't test for
+ # positives here
+ config.requirements.postgresql,
+ ),
+ (
+ mysql.VARCHAR(200, charset="utf8"),
+ Unicode(200),
+ False,
+ config.requirements.mysql,
+ ),
+ (
+ mysql.VARCHAR(200, charset="latin1"),
+ mysql.VARCHAR(200, charset="utf-8"),
+ True,
+ config.requirements.mysql + config.requirements.sqlalchemy_13,
+ ),
+ (
+ String(255, collation="utf8_bin"),
+ String(255),
+ False,
+ config.requirements.mysql,
+ ),
+ (
+ String(255, collation="utf8_bin"),
+ String(255, collation="latin1_bin"),
+ True,
+ config.requirements.mysql + config.requirements.sqlalchemy_13,
+ ),
)
def test_string_comparisons(self, cola, colb, expect_changes):
is_(self._compare_columns(cola, colb), expect_changes)