if resolve_synonyms:
actual_name, owner, dblink, synonym = self._resolve_synonym(
- connection,
- desired_owner=self.denormalize_name(schema),
- desired_synonym=self.denormalize_name(table_name)
- )
+ connection,
+ desired_owner=self.denormalize_name(schema),
+ desired_synonym=self.denormalize_name(table_name)
+ )
else:
actual_name, owner, dblink, synonym = None, None, None, None
if not actual_name:
actual_name = self.denormalize_name(table_name)
- if not dblink:
- dblink = ''
- if not owner:
+
+ if dblink:
+ # using user_db_links here since all_db_links appears
+ # to have more restricted permissions.
+ # http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
+ # will need to hear from more users if we are doing
+ # the right thing here. See [ticket:2619]
+ owner = connection.scalar(
+ sql.text("SELECT username FROM user_db_links "
+ "WHERE db_link=:link"), link=dblink)
+ dblink = "@" + dblink
+ elif not owner:
owner = self.denormalize_name(schema or self.default_schema_name)
- return (actual_name, owner, dblink, synonym)
+
+ return (actual_name, owner, dblink or '', synonym)
@reflection.cache
def get_schema_names(self, connection, **kw):
else:
char_length_col = 'data_length'
- c = connection.execute(sql.text(
- "SELECT column_name, data_type, %(char_length_col)s, data_precision, data_scale, "
- "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "
- "WHERE table_name = :table_name AND owner = :owner "
- "ORDER BY column_id" % {'dblink': dblink, 'char_length_col': char_length_col}),
- table_name=table_name, owner=schema)
+ params = {"table_name": table_name}
+ text = "SELECT column_name, data_type, %(char_length_col)s, "\
+ "data_precision, data_scale, "\
+ "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
+ "WHERE table_name = :table_name"
+ if schema is not None:
+ params['owner'] = schema
+ text += " AND owner = :owner "
+ text += " ORDER BY column_id"
+ text = text % {'dblink': dblink, 'char_length_col': char_length_col}
+
+ c = connection.execute(sql.text(text), **params)
for row in c:
(colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
resolve_synonyms, dblink,
info_cache=info_cache)
indexes = []
- q = sql.text("""
- SELECT a.index_name, a.column_name, b.uniqueness
- FROM ALL_IND_COLUMNS%(dblink)s a,
- ALL_INDEXES%(dblink)s b
- WHERE
- a.index_name = b.index_name
- AND a.table_owner = b.table_owner
- AND a.table_name = b.table_name
-
- AND a.table_name = :table_name
- AND a.table_owner = :schema
- ORDER BY a.index_name, a.column_position""" % {'dblink': dblink})
- rp = connection.execute(q, table_name=self.denormalize_name(table_name),
- schema=self.denormalize_name(schema))
+
+ params = {'table_name': table_name}
+ text = \
+ "SELECT a.index_name, a.column_name, b.uniqueness "\
+ "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
+ "\nALL_INDEXES%(dblink)s b "\
+ "\nWHERE "\
+ "\na.index_name = b.index_name "\
+ "\nAND a.table_owner = b.table_owner "\
+ "\nAND a.table_name = b.table_name "\
+ "\nAND a.table_name = :table_name "
+
+ if schema is not None:
+ params['schema'] = schema
+ text += "AND a.table_owner = :schema "
+
+ text += "ORDER BY a.index_name, a.column_position"
+
+ text = text % {'dblink': dblink}
+
+ q = sql.text(text)
+ rp = connection.execute(q, **params)
indexes = []
last_index_name = None
pk_constraint = self.get_pk_constraint(
def _get_constraint_data(self, connection, table_name, schema=None,
dblink='', **kw):
- rp = connection.execute(
- sql.text("""SELECT
- ac.constraint_name,
- ac.constraint_type,
- loc.column_name AS local_column,
- rem.table_name AS remote_table,
- rem.column_name AS remote_column,
- rem.owner AS remote_owner,
- loc.position as loc_pos,
- rem.position as rem_pos
- FROM all_constraints%(dblink)s ac,
- all_cons_columns%(dblink)s loc,
- all_cons_columns%(dblink)s rem
- WHERE ac.table_name = :table_name
- AND ac.constraint_type IN ('R','P')
- AND ac.owner = :owner
- AND ac.owner = loc.owner
- AND ac.constraint_name = loc.constraint_name
- AND ac.r_owner = rem.owner(+)
- AND ac.r_constraint_name = rem.constraint_name(+)
- AND (rem.position IS NULL or loc.position=rem.position)
- ORDER BY ac.constraint_name, loc.position""" % {'dblink': dblink}),
- table_name=table_name, owner=schema)
+ params = {'table_name': table_name}
+
+ text = \
+ "SELECT"\
+ "\nac.constraint_name,"\
+ "\nac.constraint_type,"\
+ "\nloc.column_name AS local_column,"\
+ "\nrem.table_name AS remote_table,"\
+ "\nrem.column_name AS remote_column,"\
+ "\nrem.owner AS remote_owner,"\
+ "\nloc.position as loc_pos,"\
+ "\nrem.position as rem_pos"\
+ "\nFROM all_constraints%(dblink)s ac,"\
+ "\nall_cons_columns%(dblink)s loc,"\
+ "\nall_cons_columns%(dblink)s rem"\
+ "\nWHERE ac.table_name = :table_name"\
+ "\nAND ac.constraint_type IN ('R','P')"
+
+ if schema is not None:
+ params['owner'] = schema
+ text += "\nAND ac.owner = :owner"
+
+ text += \
+ "\nAND ac.owner = loc.owner"\
+ "\nAND ac.constraint_name = loc.constraint_name"\
+ "\nAND ac.r_owner = rem.owner(+)"\
+ "\nAND ac.r_constraint_name = rem.constraint_name(+)"\
+ "\nAND (rem.position IS NULL or loc.position=rem.position)"\
+ "\nORDER BY ac.constraint_name, loc.position"
+
+ text = text % {'dblink': dblink}
+ rp = connection.execute(sql.text(text), **params)
constraint_data = rp.fetchall()
return constraint_data
self._prepare_reflection_args(connection, view_name, schema,
resolve_synonyms, dblink,
info_cache=info_cache)
- s = sql.text("""
- SELECT text FROM all_views
- WHERE owner = :schema
- AND view_name = :view_name
- """)
- rp = connection.execute(s,
- view_name=view_name, schema=schema).scalar()
+
+ params = {'view_name': view_name}
+ text = "SELECT text FROM all_views WHERE view_name=:view_name"
+
+ if schema is not None:
+ text += " AND owner = :schema"
+ params['schema'] = schema
+
+ rp = connection.execute(sql.text(text), **params).scalar()
if rp:
return rp.decode(self.encoding)
else: