return [row[0] for row in rs]
def has_table(self, connection, table_name, schema=None):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%stable_info(%s)" % (pragma, qtable)
- cursor = _pragma_cursor(connection.execute(statement))
- row = cursor.fetchone()
-
- # consume remaining rows, to work around
- # http://www.sqlite.org/cvstrac/tktview?tn=1884
- while not cursor.closed and cursor.fetchone() is not None:
- pass
-
- return row is not None
+ info = self._get_table_pragma(
+ connection, "table_info", table_name, schema=schema)
+ return bool(info)
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%stable_info(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
+ info = self._get_table_pragma(
+ connection, "table_info", table_name, schema=schema)
- rows = c.fetchall()
columns = []
- for row in rows:
+ for row in info:
(name, type_, nullable, default, primary_key) = (
row[1], row[2].upper(), not row[3], row[4], row[5])
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- qtable = quote(table_name)
- statement = "%sforeign_key_list(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
- fkeys = []
+ # sqlite makes this *extremely difficult*.
+ # First, use the pragma to get the actual FKs.
+ pragma_fks = self._get_table_pragma(
+ connection, "foreign_key_list",
+ table_name, schema=schema
+ )
+
fks = {}
- while True:
- row = c.fetchone()
- if row is None:
- break
+
+ for row in pragma_fks:
(numerical_id, rtbl, lcol, rcol) = (
row[0], row[2], row[3], row[4])
- self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol)
- return fkeys
+ if rcol is None:
+ rcol = lcol
- def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol):
- # sqlite won't return rcol if the table was created with REFERENCES
- # <tablename>, no col
- if rcol is None:
- rcol = lcol
+ if self._broken_fk_pragma_quotes:
+ rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
- if self._broken_fk_pragma_quotes:
- rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
+ if numerical_id in fks:
+ fk = fks[numerical_id]
+ else:
+ fk = fks[numerical_id] = {
+ 'name': None,
+ 'constrained_columns': [],
+ 'referred_schema': None,
+ 'referred_table': rtbl,
+ 'referred_columns': [],
+ }
+ fks[numerical_id] = fk
- try:
- fk = fks[numerical_id]
- except KeyError:
- fk = {
- 'name': None,
- 'constrained_columns': [],
- 'referred_schema': None,
- 'referred_table': rtbl,
- 'referred_columns': [],
- }
- fkeys.append(fk)
- fks[numerical_id] = fk
-
- if lcol not in fk['constrained_columns']:
fk['constrained_columns'].append(lcol)
- if rcol not in fk['referred_columns']:
fk['referred_columns'].append(rcol)
- return fk
+
+ def fk_sig(constrained_columns, referred_table, referred_columns):
+ return tuple(constrained_columns) + (referred_table,) + \
+ tuple(referred_columns)
+
+ # then, parse the actual SQL and attempt to find DDL that matches
+ # the names as well. SQLite saves the DDL in whatever format
+ # it was typed in as, so need to be liberal here.
+
+ keys_by_signature = dict(
+ (
+ fk_sig(
+ fk['constrained_columns'],
+ fk['referred_table'], fk['referred_columns']),
+ fk
+ ) for fk in fks.values()
+ )
+
+ table_data = self._get_table_sql(connection, table_name, schema=schema)
+ if table_data is None:
+ # system tables, etc.
+ return []
+
+ def parse_fks():
+ FK_PATTERN = (
+ '(?:CONSTRAINT (\w+) +)?'
+ 'FOREIGN KEY *\( *(.+?) *\) +'
+ 'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\)'
+ )
+
+ for match in re.finditer(FK_PATTERN, table_data, re.I):
+ (
+ constraint_name, constrained_columns,
+ referred_quoted_name, referred_name,
+ referred_columns) = match.group(1, 2, 3, 4, 5)
+ constrained_columns = list(
+ self._find_cols_in_sig(constrained_columns))
+ if not referred_columns:
+ referred_columns = constrained_columns
+ else:
+ referred_columns = list(
+ self._find_cols_in_sig(referred_columns))
+ referred_name = referred_quoted_name or referred_name
+ yield (
+ constraint_name, constrained_columns,
+ referred_name, referred_columns)
+ fkeys = []
+
+ for (
+ constraint_name, constrained_columns,
+ referred_name, referred_columns) in parse_fks():
+ sig = fk_sig(
+ constrained_columns, referred_name, referred_columns)
+ if sig not in keys_by_signature:
+ util.warn(
+ "WARNING: SQL-parsed foreign key constraint "
+ "'%s' could not be located in PRAGMA "
+ "foreign_keys for table %s" % (
+ sig,
+ table_name
+ ))
+ continue
+ key = keys_by_signature.pop(sig)
+ key['name'] = constraint_name
+ fkeys.append(key)
+ # assume the remainders are the unnamed, inline constraints, just
+ # use them as is as it's extremely difficult to parse inline
+ # constraints
+ fkeys.extend(keys_by_signature.values())
+ return fkeys
+
+ def _find_cols_in_sig(self, sig):
+ for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
+ yield match.group(1) or match.group(2)
+
+ @reflection.cache
+ def get_unique_constraints(self, connection, table_name,
+ schema=None, **kw):
+
+ auto_index_by_sig = {}
+ for idx in self.get_indexes(
+ connection, table_name, schema=schema,
+ include_auto_indexes=True, **kw):
+ if not idx['name'].startswith("sqlite_autoindex"):
+ continue
+ sig = tuple(idx['column_names'])
+ auto_index_by_sig[sig] = idx
+
+ table_data = self._get_table_sql(
+ connection, table_name, schema=schema, **kw)
+ if not table_data:
+ return []
+
+ unique_constraints = []
+
+ def parse_uqs():
+ UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) +)?UNIQUE *\((.+?)\)'
+ INLINE_UNIQUE_PATTERN = (
+ '(?:(".+?")|([a-z0-9]+)) '
+ '+[a-z0-9_ ]+? +UNIQUE')
+
+ for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
+ name, cols = match.group(1, 2)
+ yield name, list(self._find_cols_in_sig(cols))
+
+ # we need to match inlines as well, as we seek to differentiate
+ # a UNIQUE constraint from a UNIQUE INDEX, even though these
+ # are kind of the same thing :)
+ for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
+ cols = list(
+ self._find_cols_in_sig(match.group(1) or match.group(2)))
+ yield None, cols
+
+ for name, cols in parse_uqs():
+ sig = tuple(cols)
+ if sig in auto_index_by_sig:
+ auto_index_by_sig.pop(sig)
+ parsed_constraint = {
+ 'name': name,
+ 'column_names': cols
+ }
+ unique_constraints.append(parsed_constraint)
+ # NOTE: auto_index_by_sig might not be empty here,
+ # the PRIMARY KEY may have an entry.
+ return unique_constraints
@reflection.cache
def get_indexes(self, connection, table_name, schema=None, **kw):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- pragma = "PRAGMA %s." % quote(schema)
- else:
- pragma = "PRAGMA "
- include_auto_indexes = kw.pop('include_auto_indexes', False)
- qtable = quote(table_name)
- statement = "%sindex_list(%s)" % (pragma, qtable)
- c = _pragma_cursor(connection.execute(statement))
+ pragma_indexes = self._get_table_pragma(
+ connection, "index_list", table_name, schema=schema)
indexes = []
- while True:
- row = c.fetchone()
- if row is None:
- break
+
+ include_auto_indexes = kw.pop('include_auto_indexes', False)
+ for row in pragma_indexes:
# ignore implicit primary key index.
# http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
- elif (not include_auto_indexes and
- row[1].startswith('sqlite_autoindex')):
+ if (not include_auto_indexes and
+ row[1].startswith('sqlite_autoindex')):
continue
indexes.append(dict(name=row[1], column_names=[], unique=row[2]))
+
# loop thru unique indexes to get the column names.
for idx in indexes:
- statement = "%sindex_info(%s)" % (pragma, quote(idx['name']))
- c = connection.execute(statement)
- cols = idx['column_names']
- while True:
- row = c.fetchone()
- if row is None:
- break
- cols.append(row[2])
+ pragma_index = self._get_table_pragma(
+ connection, "index_info", idx['name'])
+
+ for row in pragma_index:
+ idx['column_names'].append(row[2])
return indexes
@reflection.cache
- def get_unique_constraints(self, connection, table_name,
- schema=None, **kw):
+ def _get_table_sql(self, connection, table_name, schema=None, **kw):
try:
s = ("SELECT sql FROM "
" (SELECT * FROM sqlite_master UNION ALL "
s = ("SELECT sql FROM sqlite_master WHERE name = '%s' "
"AND type = 'table'") % table_name
rs = connection.execute(s)
- row = rs.fetchone()
- if row is None:
- # sqlite won't return the schema for the sqlite_master or
- # sqlite_temp_master tables from this query. These tables
- # don't have any unique constraints anyway.
- return []
- table_data = row[0]
-
- UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) )?UNIQUE \(([^\)]+)\)'
- return [
- {'name': name,
- 'column_names': [col.strip(' "') for col in cols.split(',')]}
- for name, cols in re.findall(UNIQUE_PATTERN, table_data)
- ]
+ return rs.scalar()
-
-def _pragma_cursor(cursor):
- """work around SQLite issue whereby cursor.description
- is blank when PRAGMA returns no rows."""
-
- if cursor.closed:
- cursor.fetchone = lambda: None
- cursor.fetchall = lambda: []
- return cursor
+ def _get_table_pragma(self, connection, pragma, table_name, schema=None):
+ quote = self.identifier_preparer.quote_identifier
+ if schema is not None:
+ statement = "PRAGMA %s." % quote(schema)
+ else:
+ statement = "PRAGMA "
+ qtable = quote(table_name)
+ statement = "%s%s(%s)" % (statement, pragma, qtable)
+ cursor = connection.execute(statement)
+ if not cursor.closed:
+ # work around SQLite issue whereby cursor.description
+ # is blank when PRAGMA returns no rows:
+ # http://www.sqlite.org/cvstrac/tktview?tn=1884
+ result = cursor.fetchall()
+ else:
+ result = []
+ return result
from sqlalchemy import testing
from sqlalchemy.schema import CreateTable
from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import mock
class TestTypes(fixtures.TestBase, AssertsExecutionResults):
# assert j.onclause.compare(table1.c['"id"']
# == table2.c['"aid"'])
- def test_legacy_quoted_identifiers_unit(self):
- dialect = sqlite.dialect()
- dialect._broken_fk_pragma_quotes = True
-
- for row in [
- (0, 'target', 'tid', 'id'),
- (0, '"target"', 'tid', 'id'),
- (0, '[target]', 'tid', 'id'),
- (0, "'target'", 'tid', 'id'),
- (0, '`target`', 'tid', 'id'),
- ]:
- fks = {}
- fkeys = []
- dialect._parse_fk(fks, fkeys, *row)
- eq_(
- fkeys,
- [{
- 'referred_table': 'target',
- 'referred_columns': ['id'],
- 'referred_schema': None,
- 'name': None,
- 'constrained_columns': ['tid']
- }])
-
@testing.provide_metadata
def test_description_encoding(self):
# amazingly, pysqlite seems to still deliver cursor.description
e = create_engine('sqlite+pysqlite:///foo.db')
assert e.pool.__class__ is pool.NullPool
- @testing.provide_metadata
- def test_dont_reflect_autoindex(self):
- meta = self.metadata
- Table('foo', meta, Column('bar', String, primary_key=True))
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(inspector.get_indexes('foo'), [])
- eq_(
- inspector.get_indexes('foo', include_auto_indexes=True),
- [{
- 'unique': 1,
- 'name': 'sqlite_autoindex_foo_1',
- 'column_names': ['bar']}])
-
- @testing.provide_metadata
- def test_create_index_with_schema(self):
- """Test creation of index with explicit schema"""
-
- meta = self.metadata
- Table(
- 'foo', meta, Column('bar', String, index=True),
- schema='main')
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(
- inspector.get_indexes('foo', schema='main'),
- [{'unique': 0, 'name': u'ix_main_foo_bar',
- 'column_names': [u'bar']}])
-
- @testing.provide_metadata
- def test_get_unique_constraints(self):
- meta = self.metadata
- Table(
- 'foo', meta, Column('f', Integer),
- UniqueConstraint('f', name='foo_f'))
- Table(
- 'bar', meta, Column('b', Integer),
- UniqueConstraint('b', name='bar_b'),
- prefixes=['TEMPORARY'])
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(inspector.get_unique_constraints('foo'),
- [{'column_names': [u'f'], 'name': u'foo_f'}])
- eq_(inspector.get_unique_constraints('bar'),
- [{'column_names': [u'b'], 'name': u'bar_b'}])
-
- def test_get_unnamed_unique_constraints(self):
- meta = MetaData(testing.db)
- t1 = Table('foo', meta, Column('f', Integer),
- UniqueConstraint('f'))
- t2 = Table('bar', meta, Column('b', Integer),
- UniqueConstraint('b'),
- prefixes=['TEMPORARY'])
- meta.create_all()
- from sqlalchemy.engine.reflection import Inspector
- try:
- inspector = Inspector(testing.db)
- eq_(inspector.get_unique_constraints('foo'),
- [{'column_names': [u'f'], 'name': u''}])
- eq_(inspector.get_unique_constraints('bar'),
- [{'column_names': [u'b'], 'name': u''}])
- finally:
- meta.drop_all()
class AttachedMemoryDBTest(fixtures.TestBase):
assert b.c.id.references(a.c.id)
-class ReflectFKConstraintTest(fixtures.TestBase):
+class ConstraintReflectionTest(fixtures.TestBase):
__only_on__ = 'sqlite'
- def setup(self):
- testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
- testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
- testing.db.execute(
- "CREATE TABLE b (id INTEGER PRIMARY KEY, "
- "FOREIGN KEY(id) REFERENCES a1(id),"
- "FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
- testing.db.execute(
- "CREATE TABLE c (id INTEGER, "
- "CONSTRAINT bar PRIMARY KEY(id),"
- "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
- "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
+ @classmethod
+ def setup_class(cls):
+ with testing.db.begin() as conn:
+
+ conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
+ conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
+ conn.execute(
+ "CREATE TABLE b (id INTEGER PRIMARY KEY, "
+ "FOREIGN KEY(id) REFERENCES a1(id),"
+ "FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ "CREATE TABLE c (id INTEGER, "
+ "CONSTRAINT bar PRIMARY KEY(id),"
+ "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
+ "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ "CREATE TABLE d (id INTEGER, x INTEGER unique)")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d1 '
+ '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)')
+
+ conn.execute(
+ # lower casing + inline is intentional
+ "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))")
+ conn.execute(
+ 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER '
+ 'references a2 ("some ( STUPID n,ame"))')
+ conn.execute(
+ 'CREATE TABLE e2 (id INTEGER, '
+ '"some ( STUPID n,ame" INTEGER NOT NULL '
+ 'references a2 ("some ( STUPID n,ame"))')
+
+ conn.execute(
+ "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TEMPORARY TABLE g "
+ "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))"
+ )
+ conn.execute(
+ # intentional broken casing
+ "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))"
+ )
+ conn.execute(
+ "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))"
+ )
+ conn.execute(
+ "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), "
+ "conSTRAINT my_fk FOreiGN KEY ( q , p ) "
+ "REFERENCes i ( x , y ))"
+ )
- def teardown(self):
- testing.db.execute("drop table c")
- testing.db.execute("drop table b")
- testing.db.execute("drop table a1")
- testing.db.execute("drop table a2")
+ meta = MetaData()
+ Table(
+ 'l', meta, Column('bar', String, index=True),
+ schema='main')
+
+ Table(
+ 'm', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x')
+ )
- def test_name_is_none(self):
+ Table(
+ 'n', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x'),
+ prefixes=['TEMPORARY']
+ )
+
+ meta.create_all(conn)
+
+ # will contain an "autoindex"
+ conn.execute("create table o (foo varchar(20) primary key)")
+
+ @classmethod
+ def teardown_class(cls):
+ with testing.db.begin() as conn:
+ for name in [
+ "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1",
+ "d", "d1", "d2", "c", "b", "a1", "a2"]:
+ conn.execute("drop table %s" % name)
+
+ def test_legacy_quoted_identifiers_unit(self):
+ dialect = sqlite.dialect()
+ dialect._broken_fk_pragma_quotes = True
+
+ for row in [
+ (0, None, 'target', 'tid', 'id', None),
+ (0, None, '"target"', 'tid', 'id', None),
+ (0, None, '[target]', 'tid', 'id', None),
+ (0, None, "'target'", 'tid', 'id', None),
+ (0, None, '`target`', 'tid', 'id', None),
+ ]:
+ def _get_table_pragma(*arg, **kw):
+ return [row]
+
+ def _get_table_sql(*arg, **kw):
+ return "CREATE TABLE foo "\
+ "(tid INTEGER, "\
+ "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2]
+ with mock.patch.object(
+ dialect, "_get_table_pragma", _get_table_pragma):
+ with mock.patch.object(
+ dialect, '_get_table_sql', _get_table_sql):
+
+ fkeys = dialect.get_foreign_keys(None, 'foo')
+ eq_(
+ fkeys,
+ [{
+ 'referred_table': 'target',
+ 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None,
+ 'constrained_columns': ['tid']
+ }])
+
+ def test_foreign_key_name_is_none(self):
# and not "0"
- meta = MetaData()
- b = Table('b', meta, autoload=True, autoload_with=testing.db)
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('b')
eq_(
- [con.name for con in b.constraints],
- [None, None, None]
+ fks,
+ [
+ {'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ {'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ ]
)
- def test_name_not_none(self):
- # we don't have names for PK constraints,
- # it appears we get back None in the pragma for
- # FKs also (also it doesn't even appear to be documented on
- # sqlite's docs
- # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list
- # how did we ever know that's the "name" field ??)
+ def test_foreign_key_name_is_not_none(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('c')
+ eq_(
+ fks,
+ [
+ {
+ 'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo1',
+ 'constrained_columns': ['id']},
+ {
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo2',
+ 'constrained_columns': ['id']},
+ ]
+ )
- meta = MetaData()
- c = Table('c', meta, autoload=True, autoload_with=testing.db)
+ def test_unnamed_inline_foreign_key(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['x']
+ }]
+ )
+
+ def test_unnamed_inline_foreign_key_quoted(self):
+ inspector = Inspector(testing.db)
+
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e1')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+ fks = inspector.get_foreign_keys('e2')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+
+ def test_foreign_key_composite_broken_casing(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('j')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'i',
+ 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['q', 'p']}]
+ )
+ fks = inspector.get_foreign_keys('k')
+ eq_(
+ fks,
+ [{'referred_table': 'i', 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': 'my_fk',
+ 'constrained_columns': ['q', 'p']}]
+ )
+
+ def test_dont_reflect_autoindex(self):
+ inspector = Inspector(testing.db)
+ eq_(inspector.get_indexes('o'), [])
+ eq_(
+ inspector.get_indexes('o', include_auto_indexes=True),
+ [{
+ 'unique': 1,
+ 'name': 'sqlite_autoindex_o_1',
+ 'column_names': ['foo']}])
+
+ def test_create_index_with_schema(self):
+ """Test creation of index with explicit schema"""
+
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_indexes('l', schema='main'),
+ [{'unique': 0, 'name': u'ix_main_l_bar',
+ 'column_names': [u'bar']}])
+
+ def test_unique_constraint_named(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("f"),
+ [{'column_names': ['x'], 'name': 'foo_fx'}]
+ )
+
+ def test_unique_constraint_named_broken_casing(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("h"),
+ [{'column_names': ['x'], 'name': 'foo_hx'}]
+ )
+
+ def test_unique_constraint_named_broken_temp(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("g"),
+ [{'column_names': ['x'], 'name': 'foo_gx'}]
+ )
+
+ def test_unique_constraint_unnamed_inline(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_inline_quoted(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d1"),
+ [{'column_names': ['some ( STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d2"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d3"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("m"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal_temporary(self):
+ inspector = Inspector(testing.db)
eq_(
- set([con.name for con in c.constraints]),
- set([None, None])
+ inspector.get_unique_constraints("n"),
+ [{'column_names': ['x'], 'name': None}]
)