else:
return False
+ @reflection.cache
+ def get_table_names(self, connection, schema=None, **kw):
+ s = """
+ SELECT DISTINCT rdb$relation_name
+ FROM rdb$relation_fields WHERE
+ rdb$system_flag=0 AND rdb$view_context IS NULL
+ """
+ return [self._normalize_name(row[0]) for row in connection.execute(s)]
+
@reflection.cache
def get_primary_keys(self, connection, table_name, schema=None, **kw):
# Query to extract the PK/FK constrained fields of the given table
return [row[0] for row in self._compat_fetchall(rp, charset=charset)\
if row[1] == 'VIEW']
+ @reflection.cache
+ def get_table_options(self, connection, table_name, schema=None, **kw):
+
+ parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
+ return parsed_state.table_options
+
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
def reflecttable(self, connection, table, include_columns):
"""Load column definitions from the server."""
- charset = self._connection_charset
- self._adjust_casing(table)
- parsed_state = self._setup_parser(connection, table.name, table.schema)
-
- # check the table name
- if parsed_state.table_name is not None:
- table.name = parsed_state.table_name
- # apply table options
- if parsed_state.table_options:
- table.kwargs.update(parsed_state.table_options)
- # columns
- for col_d in self.get_columns(connection, table.name, table.schema,
- parsed_state=parsed_state):
- name = col_d['name']
- coltype = col_d['type']
- nullable = col_d.get('nullable', True)
- default = col_d['default']
- colargs = col_d['colargs']
- if include_columns and name not in include_columns:
- continue
- if default is not None and default != 'NULL':
- colargs.append(sa_schema.DefaultClause(default))
- # Can I not specify nullable=True?
- col_kw = {}
- if nullable is False:
- col_kw['nullable'] = False
- if 'autoincrement' in col_d:
- col_kw['autoincrement'] = col_d['autoincrement']
- table.append_column(sa_schema.Column(name, coltype,
- *colargs, **col_kw))
-
- # primary keys
- pkey_cols = self.get_primary_keys(connection, table.name,
- table.schema, parsed_state=parsed_state)
- if include_columns:
- pkey_cols = [p for p in pkey_cols if p in include_columns]
- pkey = sa_schema.PrimaryKeyConstraint()
- for col in [table.c[name] for name in pkey_cols]:
- pkey.append_column(col)
- table.append_constraint(pkey)
-
- fkeys = self.get_foreign_keys(connection, table.name,
- table.schema, parsed_state=parsed_state)
- # foreign keys
- for fkey_d in fkeys:
- conname = fkey_d['name']
- loc_names = fkey_d['constrained_columns']
- ref_schema = fkey_d['referred_schema']
- ref_name = fkey_d['referred_table']
- ref_names = fkey_d['referred_columns']
- con_kw = fkey_d['options']
- refspec = []
-
- # load related table
- ref_key = sa_schema._get_table_key(ref_name, ref_schema)
- if ref_key in table.metadata.tables:
- ref_table = table.metadata.tables[ref_key]
- else:
- ref_table = sa_schema.Table(
- ref_name, table.metadata, schema=ref_schema,
- autoload=True, autoload_with=connection)
-
- if ref_schema:
- refspec = [".".join([ref_schema, ref_name, column]) for column in ref_names]
- else:
- refspec = [".".join([ref_name, column]) for column in ref_names]
- key = sa_schema.ForeignKeyConstraint(loc_names, refspec,
- link_to_name=True, **con_kw)
- table.append_constraint(key)
-
- # Indexes
- indexes = self.get_indexes(connection, table.name, table.schema,
- parsed_state=parsed_state)
- for index_d in indexes:
- name = index_d['name']
- col_names = index_d['column_names']
- unique = index_d['unique']
- flavor = index_d['type']
- if include_columns and \
- not set(col_names).issubset(include_columns):
- self.logger.info(
- "Omitting %s KEY for (%s), key covers ommitted columns." %
- (flavor, ', '.join(col_names)))
- continue
- key = sa_schema.Index(name, unique=unique)
- for col in [table.c[name] for name in col_names]:
- key.append_column(col)
+ insp = reflection.Inspector.from_engine(connection)
+ return insp.reflecttable(table, include_columns)
def _adjust_casing(self, table, charset=None):
"""Adjust Table name to the server case sensitivity, if needed."""
col_args, col_kw = [], {}
# NOT NULL
+ col_kw['nullable'] = True
if spec.get('notnull', False):
col_kw['nullable'] = False
default = sql.text(default)
else:
default = default[1:-1]
- col_d = dict(name=name, type=type_instance, colargs=col_args,
+ elif default == 'NULL':
+ # eliminates the need to deal with this later.
+ default = None
+ col_d = dict(name=name, type=type_instance, attrs={},
default=default)
col_d.update(col_kw)
state.columns.append(col_d)
return [self._normalize_name(row[0]) for row in cursor]
@reflection.cache
- def get_columns(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
+ def get_columns(self, connection, table_name, schema=None, **kw):
+ """
+
+ kw arguments can be:
+
+ oracle_resolve_synonyms
+
+ dblink
+
+ """
+
+ resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
+ dblink = kw.get('dblink', '')
-
(table_name, schema, dblink, synonym) = \
self._prepare_reflection_args(connection, table_name, schema,
resolve_synonyms, dblink)
util.warn("Did not recognize type '%s' of column '%s'" %
(coltype, colname))
coltype = sqltypes.NULLTYPE
-
- colargs = []
- if default is not None:
- colargs.append(sa_schema.DefaultClause(sql.text(default)))
cdict = {
'name': colname,
'type': coltype,
'nullable': nullable,
'default': default,
- 'attrs': colargs
+ 'attrs': {}
}
columns.append(cdict)
return columns
indexes = []
last_index_name = None
pkeys = self.get_primary_keys(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ resolve_synonyms=resolve_synonyms,
+ dblink=dblink,
+ info_cache=kw.get('info_cache'))
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
for rset in rp:
# don't include the primary key columns
return constraint_data
@reflection.cache
- def get_primary_keys(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
+ def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ """
+
+ kw arguments can be:
+
+ oracle_resolve_synonyms
+
+ dblink
+
+ """
+
+ resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
+ dblink = kw.get('dblink', '')
+
(table_name, schema, dblink, synonym) = \
self._prepare_reflection_args(connection, table_name, schema,
resolve_synonyms, dblink)
return pkeys
@reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
+ def get_foreign_keys(self, connection, table_name, schema=None, **kw):
+ """
+
+ kw arguments can be:
+
+ oracle_resolve_synonyms
+
+ dblink
+
+ """
+
+ requested_schema = schema # to check later on
+ resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
+ dblink = kw.get('dblink', '')
+
(table_name, schema, dblink, synonym) = \
self._prepare_reflection_args(connection, table_name, schema,
resolve_synonyms, dblink)
fk[1].append(remote_column)
for (name, value) in fks.items():
if remote_table and value[1]:
+ if requested_schema is None and remote_owner is not None:
+ default_schema = self.get_default_schema_name(connection)
+ if remote_owner.lower() == default_schema.lower():
+ remote_owner = None
fkey_d = {
'name' : name,
'constrained_columns' : value[0],
return view_def
def reflecttable(self, connection, table, include_columns):
- preparer = self.identifier_preparer
- info_cache = {}
-
- resolve_synonyms = table.kwargs.get('oracle_resolve_synonyms', False)
-
- (actual_name, owner, dblink, synonym) = \
- self._prepare_reflection_args(connection, table.name, table.schema,
- resolve_synonyms)
-
- # columns
- columns = self.get_columns(connection, actual_name, owner, dblink,
- info_cache=info_cache)
- for cdict in columns:
- colname = cdict['name']
- coltype = cdict['type']
- nullable = cdict['nullable']
- colargs = cdict['attrs']
- if include_columns and colname not in include_columns:
- continue
- table.append_column(sa_schema.Column(colname, coltype,
- nullable=nullable, *colargs))
- if not table.columns:
- raise AssertionError("Couldn't find any column information for table %s" % actual_name)
-
- # primary keys
- for pkcol in self.get_primary_keys(connection, actual_name, owner,
- dblink, info_cache=info_cache):
- if pkcol in table.c:
- table.primary_key.add(table.c[pkcol])
-
- # foreign keys
- fks = {}
- fkeys = []
- fkeys = self.get_foreign_keys(connection, actual_name, owner,
- resolve_synonyms, dblink,
- info_cache=info_cache)
- refspecs = []
- for fkey_d in fkeys:
- conname = fkey_d['name']
- constrained_columns = fkey_d['constrained_columns']
- referred_schema = fkey_d['referred_schema']
- referred_table = fkey_d['referred_table']
- referred_columns = fkey_d['referred_columns']
- for (i, ref_col) in enumerate(referred_columns):
- if not table.schema and self._denormalize_name(referred_schema) == self._denormalize_name(owner):
- t = sa_schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
-
- refspec = ".".join([referred_table, ref_col])
- else:
- refspec = '.'.join([x for x in [referred_schema,
- referred_table, ref_col] if x is not None])
-
- t = sa_schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection, schema=referred_schema, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
- refspecs.append(refspec)
- table.append_constraint(
- sa_schema.ForeignKeyConstraint(constrained_columns, refspecs,
- name=conname, link_to_name=True))
+ insp = reflection.Inspector.from_engine(connection)
+ return insp.reflecttable(table, include_columns)
class _OuterJoinColumn(sql.ClauseElement):
util.warn("Did not recognize type '%s' of column '%s'" %
(attype, name))
coltype = sqltypes.NULLTYPE
- colargs = []
+ # adjust the default value
+ if default is not None:
+ match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
+ if match is not None:
+ # the default is related to a Sequence
+ sch = schema
+ if '.' not in match.group(2) and sch is not None:
+ # unconditionally quote the schema name. this could
+ # later be enhanced to obey quoting rules / "quote schema"
+ default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3)
+
column_info = dict(name=name, type=coltype, nullable=nullable,
- default=default, colargs=colargs)
+ default=default, attrs={})
columns.append(column_info)
return columns
return indexes
def reflecttable(self, connection, table, include_columns):
- preparer = self.identifier_preparer
- schema = table.schema
- table_name = table.name
- info_cache = {}
- # Py2K
- if isinstance(schema, str):
- schema = schema.decode(self.encoding)
- if isinstance(table_name, str):
- table_name = table_name.decode(self.encoding)
- # end Py2K
- for col_d in self.get_columns(connection, table_name, schema,
- info_cache=info_cache):
- name = col_d['name']
- coltype = col_d['type']
- nullable = col_d['nullable']
- default = col_d['default']
- colargs = col_d['colargs']
- if include_columns and name not in include_columns:
- continue
- if default is not None:
- match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
- if match is not None:
- # the default is related to a Sequence
- sch = schema
- if '.' not in match.group(2) and sch is not None:
- # unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules / "quote schema"
- default = match.group(1) + ('"%s"' % sch) + '.' + match.group(2) + match.group(3)
- colargs.append(sa_schema.DefaultClause(sql.text(default)))
- table.append_column(sa_schema.Column(name, coltype, nullable=nullable, *colargs))
- # Now we have the table oid cached.
- table_oid = self.get_table_oid(connection, table_name, schema,
- info_cache=info_cache)
- # Primary keys
- for pk in self.get_primary_keys(connection, table_name, schema,
- info_cache=info_cache):
- if pk in table.c:
- col = table.c[pk]
- table.primary_key.add(col)
- if col.default is None:
- col.autoincrement = False
- # Foreign keys
- fkeys = self.get_foreign_keys(connection, table_name, schema,
- info_cache=info_cache)
- for fkey_d in fkeys:
- conname = fkey_d['name']
- constrained_columns = fkey_d['constrained_columns']
- referred_schema = fkey_d['referred_schema']
- referred_table = fkey_d['referred_table']
- referred_columns = fkey_d['referred_columns']
- refspec = []
- if referred_schema is not None:
- sa_schema.Table(referred_table, table.metadata, autoload=True, schema=referred_schema,
- autoload_with=connection)
- for column in referred_columns:
- refspec.append(".".join([referred_schema, referred_table, column]))
- else:
- sa_schema.Table(referred_table, table.metadata, autoload=True, autoload_with=connection)
- for column in referred_columns:
- refspec.append(".".join([referred_table, column]))
-
- table.append_constraint(sa_schema.ForeignKeyConstraint(constrained_columns, refspec, conname, link_to_name=True))
-
- # Indexes
- indexes = self.get_indexes(connection, table_name, schema,
- info_cache=info_cache)
- for index_d in indexes:
- name = index_d['name']
- columns = index_d['column_names']
- unique = index_d['unique']
- sa_schema.Index(name, *[table.columns[c] for c in columns],
- **dict(unique=unique))
+ insp = reflection.Inspector.from_engine(connection)
+ return insp.reflecttable(table, include_columns)
def _load_domains(self, connection):
## Load data types for domains:
if args is not None:
args = re.findall(r'(\d+)', args)
coltype = coltype(*[int(a) for a in args])
- colargs = []
- if has_default:
- colargs.append(DefaultClause(sql.text(default)))
columns.append({
'name' : name,
'type' : coltype,
'nullable' : nullable,
'default' : default,
- 'colargs' : colargs,
+ 'attrs' : {},
'primary_key': primary_key
})
return columns
return unique_indexes
def reflecttable(self, connection, table, include_columns):
- preparer = self.identifier_preparer
- table_name = table.name
- schema = table.schema
- found_table = False
-
- info_cache = {}
-
- # columns
- for column in self.get_columns(connection, table_name, schema,
- info_cache=info_cache):
- name = column['name']
- coltype = column['type']
- nullable = column['nullable']
- default = column['default']
- colargs = column['colargs']
- primary_key = column['primary_key']
- found_table = True
- if include_columns and name not in include_columns:
- continue
- table.append_column(sa_schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
- if not found_table:
- raise exc.NoSuchTableError(table.name)
-
- # foreign keys
- for fkey_d in self.get_foreign_keys(connection, table_name, schema,
- info_cache=info_cache):
-
- rtbl = fkey_d['referred_table']
- rcols = fkey_d['referred_columns']
- lcols = fkey_d['constrained_columns']
- # look up the table based on the given table's engine, not 'self',
- # since it could be a ProxyEngine
- remotetable = sa_schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
- refspecs = ["%s.%s" % (rtbl, rcol) for rcol in rcols]
- table.append_constraint(sa_schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True))
- # this doesn't do anything ???
- unique_indexes = self.get_unique_indexes(connection, table_name,
- schema, info_cache=info_cache)
-
+ insp = reflection.Inspector.from_engine(connection)
+ return insp.reflecttable(table, include_columns)
def _pragma_cursor(cursor):
"""work around SQLite issue whereby cursor.description is blank when PRAGMA returns no rows."""
"""
import sqlalchemy
+from sqlalchemy import sql
from sqlalchemy import util
from sqlalchemy.types import TypeEngine
+from sqlalchemy import schema as sa_schema
@util.decorator
tnames = ordered_tnames
return tnames
+ def get_table_options(self, table_name, schema=None, **kw):
+ if hasattr(self.dialect, 'get_table_options'):
+ return self.dialect.get_table_options(self.conn, table_name, schema,
+ info_cache=self.info_cache,
+ **kw)
+ return {}
+
def get_view_names(self, schema=None):
"""Return all view names in `schema`.
return self.dialect.get_view_definition(
self.conn, view_name, schema, info_cache=self.info_cache)
- def get_columns(self, table_name, schema=None):
+ def get_columns(self, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
Given a string `table_name` and an optional string `schema`, return
dict containing optional column attributes
"""
- col_defs = self.dialect.get_columns(self.conn, table_name,
- schema,
- info_cache=self.info_cache)
+ col_defs = self.dialect.get_columns(self.conn, table_name, schema,
+ info_cache=self.info_cache,
+ **kw)
for col_def in col_defs:
# make this easy and only return instances for coltype
coltype = col_def['type']
col_def['type'] = coltype()
return col_defs
- def get_primary_keys(self, table_name, schema=None):
+ def get_primary_keys(self, table_name, schema=None, **kw):
"""Return information about primary keys in `table_name`.
Given a string `table_name`, and an optional string `schema`, return
primary key information as a list of column names.
"""
- pkeys = self.dialect.get_primary_keys(self.conn, table_name,
- schema,
- info_cache=self.info_cache)
+ pkeys = self.dialect.get_primary_keys(self.conn, table_name, schema,
+ info_cache=self.info_cache,
+ **kw)
return pkeys
- def get_foreign_keys(self, table_name, schema=None):
+ def get_foreign_keys(self, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
Given a string `table_name`, and an optional string `schema`, return
constrained_columns
"""
- fk_defs = self.dialect.get_foreign_keys(self.conn, table_name,
- schema,
- info_cache=self.info_cache)
- for fk_def in fk_defs:
- referred_schema = fk_def['referred_schema']
- # always set the referred_schema.
- if referred_schema is None and schema is None:
- try:
- referred_schema = self.dialect.get_default_schema_name(
- self.conn)
- fk_def['referred_schema'] = referred_schema
- except NotImplementedError:
- pass
+ fk_defs = self.dialect.get_foreign_keys(self.conn, table_name, schema,
+ info_cache=self.info_cache,
+ **kw)
return fk_defs
def get_indexes(self, table_name, schema=None):
schema,
info_cache=self.info_cache)
return indexes
+
+ def reflecttable(self, table, include_columns):
+
+ # for some work arounds
+ from sqlalchemy.dialects.mysql.mysqldb import MySQLDialect
+
+ dialect = self.conn.dialect
+
+ # MySQL dialect does this. Applicable with other dialects?
+ if hasattr(dialect, '_connection_charset') \
+ and hasattr(dialect, '_adjust_casing'):
+ charset = dialect._connection_charset
+ dialect._adjust_casing(table)
+
+ # table attributes we might need.
+ oracle_resolve_synonyms = table.kwargs.get('oracle_resolve_synonyms',
+ False)
+
+ # oracle stuff; could be made for generic synonym support.
+ (actual_name, owner, dblink, synonym) = (None, None, None, None)
+ if oracle_resolve_synonyms:
+ (actual_name, owner, dblink, synonym) = dialect._resolve_synonym(
+ self.conn,
+ desired_owner=dialect._denormalize_name(table.schema),
+ desired_synonym=dialect._denormalize_name(table.name)
+ )
+
+ # some properties that need to be figured out
+ fk_use_existing = True
+
+ schema = table.schema
+ table_name = table.name
+
+ # apply table options
+ tbl_opts = self.get_table_options(table_name, schema, **table.kwargs)
+ if tbl_opts:
+ table.kwargs.update(tbl_opts)
+
+ # table.kwargs will need to be passed to each reflection method. Make
+ # sure keywords are strings.
+ tblkw = table.kwargs.copy()
+ for (k, v) in tblkw.items():
+ del tblkw[k]
+ tblkw[str(k)] = v
+
+ # Py2K
+ if isinstance(schema, str):
+ schema = schema.decode(self.dialect.encoding)
+ if isinstance(table_name, str):
+ table_name = table_name.decode(self.dialect.encoding)
+ # end Py2K
+ # columns
+ for col_d in self.get_columns(table_name, schema, **tblkw):
+ name = col_d['name']
+ coltype = col_d['type']
+ nullable = col_d['nullable']
+ default = col_d['default']
+ attrs = col_d['attrs']
+ # construct additional colargs with attrs
+ # currently, it's not used here.
+ colargs = []
+ col_kw = {}
+ if 'autoincrement' in col_d:
+ col_kw['autoincrement'] = col_d['autoincrement']
+ if include_columns and name not in include_columns:
+ continue
+ if default is not None:
+ # fixme
+ # mysql does not use sql.text
+ if isinstance(dialect, MySQLDialect):
+ colargs.append(sa_schema.DefaultClause(default))
+ else:
+ colargs.append(sa_schema.DefaultClause(sql.text(default)))
+ table.append_column(sa_schema.Column(name, coltype,
+ nullable=nullable, *colargs, **col_kw))
+ # Primary keys
+ for pk in self.get_primary_keys(table_name, schema, **tblkw):
+ if pk in table.c:
+ col = table.c[pk]
+ table.primary_key.add(col)
+ # fixme
+ if not isinstance(dialect, MySQLDialect):
+ if col.default is None:
+ col.autoincrement = False
+ # Foreign keys
+ fkeys = self.get_foreign_keys(table_name, schema, **tblkw)
+ for fkey_d in fkeys:
+ conname = fkey_d['name']
+ constrained_columns = fkey_d['constrained_columns']
+ referred_schema = fkey_d['referred_schema']
+ referred_table = fkey_d['referred_table']
+ referred_columns = fkey_d['referred_columns']
+ refspec = []
+ if referred_schema is not None:
+ sa_schema.Table(referred_table, table.metadata,
+ autoload=True, schema=referred_schema,
+ autoload_with=self.conn,
+ oracle_resolve_synonyms=oracle_resolve_synonyms,
+ useexisting=fk_use_existing
+ )
+ for column in referred_columns:
+ refspec.append(".".join(
+ [referred_schema, referred_table, column]))
+ else:
+ sa_schema.Table(referred_table, table.metadata, autoload=True,
+ autoload_with=self.conn,
+ oracle_resolve_synonyms=oracle_resolve_synonyms,
+ useexisting=fk_use_existing
+ )
+ for column in referred_columns:
+ refspec.append(".".join([referred_table, column]))
+ table.append_constraint(
+ sa_schema.ForeignKeyConstraint(constrained_columns, refspec,
+ conname, link_to_name=True))
+ # Indexes
+ indexes = self.get_indexes(table_name, schema)
+ for index_d in indexes:
+ name = index_d['name']
+ columns = index_d['column_names']
+ unique = index_d['unique']
+ flavor = index_d.get('type', 'unknown type')
+ if include_columns and \
+ not set(columns).issubset(include_columns):
+ self.logger.info(
+ "Omitting %s KEY for (%s), key covers ommitted columns." %
+ (flavor, ', '.join(columns)))
+ continue
+ sa_schema.Index(name, *[table.columns[c] for c in columns],
+ **dict(unique=unique))
import testenv; testenv.configure_for_tests()
import StringIO, unicodedata
import sqlalchemy as sa
+from sqlalchemy import types as sql_types
from sqlalchemy import schema
from sqlalchemy.engine.reflection import Inspector
from testlib.sa import MetaData, Table, Column
# Tests related to engine.reflection
def get_schema():
+ if testing.against('oracle'):
+ return 'scott'
return 'test_schema'
def createTables(meta, schema=None):
insp = Inspector(meta.bind)
try:
expected_schema = schema
- if schema is None:
- try:
- expected_schema = meta.bind.dialect.get_default_schema_name(
- meta.bind)
- except NotImplementedError:
- expected_schema = None
# users
users_fkeys = insp.get_foreign_keys(users.name,
schema=schema)