def visit_sequence(self, seq):
return self.execute_string("SELECT " +
self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval FROM DUAL", {})
+ ".nextval FROM DUAL", ())
class OracleIdentifierPreparer(compiler.IdentifierPreparer):
def has_table(self, connection, table_name, schema=None):
if not schema:
schema = self.get_default_schema_name(connection)
- cursor = connection.execute("""select table_name from all_tables where table_name=:name and owner=:schema_name""", {'name':self.denormalize_name(table_name), 'schema_name':self.denormalize_name(schema)})
+ cursor = connection.execute(
+ sql.text("SELECT table_name FROM all_tables "
+ "WHERE table_name = :name AND owner = :schema_name"),
+ name=self.denormalize_name(table_name), schema_name=self.denormalize_name(schema))
return cursor.fetchone() is not None
def has_sequence(self, connection, sequence_name, schema=None):
if not schema:
schema = self.get_default_schema_name(connection)
- cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name and sequence_owner=:schema_name""", {'name':self.denormalize_name(sequence_name), 'schema_name':self.denormalize_name(schema)})
+ cursor = connection.execute(
+ sql.text("SELECT sequence_name FROM all_sequences "
+ "WHERE sequence_name = :name AND sequence_owner = :schema_name"),
+ name=self.denormalize_name(sequence_name), schema_name=self.denormalize_name(schema))
return cursor.fetchone() is not None
def normalize_name(self, name):
if name is None:
return None
- elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding)):
+ elif (name.upper() == name and
+ not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding))):
return name.lower().decode(self.encoding)
else:
return name.decode(self.encoding)
def table_names(self, connection, schema):
# note that table_names() isnt loading DBLINKed or synonym'ed tables
if schema is None:
- s = "select table_name from all_tables where nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX')"
- cursor = connection.execute(s)
+ cursor = connection.execute(
+ "SELECT table_name FROM all_tables "
+ "WHERE nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX')")
else:
- s = "select table_name from all_tables where nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM','SYSAUX') AND OWNER = :owner"
- cursor = connection.execute(s, {'owner': self.denormalize_name(schema)})
+ s = sql.text(
+ "SELECT table_name FROM all_tables "
+ "WHERE nvl(tablespace_name, 'no tablespace') NOT IN ('SYSTEM', 'SYSAUX') "
+ "AND OWNER = :owner")
+ cursor = connection.execute(s, owner=self.denormalize_name(schema))
return [self.normalize_name(row[0]) for row in cursor]
def _resolve_synonym(self, connection, desired_owner=None, desired_synonym=None, desired_table=None):
returns the actual name, owner, dblink name, and synonym name if found.
"""
- sql = """select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK, SYNONYM_NAME
- from ALL_SYNONYMS WHERE """
-
+ q = "SELECT owner, table_owner, table_name, db_link, synonym_name FROM all_synonyms WHERE "
clauses = []
params = {}
if desired_synonym:
- clauses.append("SYNONYM_NAME=:synonym_name")
+ clauses.append("synonym_name = :synonym_name")
params['synonym_name'] = desired_synonym
if desired_owner:
- clauses.append("TABLE_OWNER=:desired_owner")
+ clauses.append("table_owner = :desired_owner")
params['desired_owner'] = desired_owner
if desired_table:
- clauses.append("TABLE_NAME=:tname")
+ clauses.append("table_name = :tname")
params['tname'] = desired_table
- sql += " AND ".join(clauses)
+ q += " AND ".join(clauses)
- result = connection.execute(sql, **params)
+ result = connection.execute(sql.text(q), **params)
if desired_owner:
row = result.fetchone()
if row:
- return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
+ return row['table_name'], row['table_owner'], row['db_link'], row['synonym_name']
else:
return None, None, None, None
else:
raise AssertionError("There are multiple tables visible to the schema, you must specify owner")
elif len(rows) == 1:
row = rows[0]
- return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
+ return row['table_name'], row['table_owner'], row['db_link'], row['synonym_name']
else:
return None, None, None, None
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
schema = self.denormalize_name(schema or self.get_default_schema_name(connection))
- s = "select view_name from all_views where OWNER = :owner"
- cursor = connection.execute(s,
- {'owner':self.denormalize_name(schema)})
+ s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
+ cursor = connection.execute(s, owner=self.denormalize_name(schema))
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
resolve_synonyms, dblink,
info_cache=info_cache)
columns = []
- c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, "
- "DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s "
- "where TABLE_NAME = :table_name and OWNER = :owner" %
- {'dblink':dblink}, {'table_name':table_name, 'owner':schema}
- )
+ c = connection.execute(sql.text(
+ "SELECT column_name, data_type, data_length, data_precision, data_scale, "
+ "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "
+ "WHERE table_name = :table_name AND owner = :owner" % {'dblink': dblink}),
+ table_name=table_name, owner=schema)
for row in c:
-
(colname, coltype, length, precision, scale, nullable, default) = \
(self.normalize_name(row[0]), row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
resolve_synonyms, dblink,
info_cache=info_cache)
indexes = []
- q = """
- SELECT a.INDEX_NAME, a.COLUMN_NAME, b.UNIQUENESS
+ q = sql.text("""
+ SELECT a.index_name, a.column_name, b.uniqueness
FROM ALL_IND_COLUMNS%(dblink)s a
INNER JOIN ALL_INDEXES%(dblink)s b
- ON a.INDEX_NAME = b.INDEX_NAME
- AND a.TABLE_OWNER = b.TABLE_OWNER
- AND a.TABLE_NAME = b.TABLE_NAME
- WHERE a.TABLE_NAME = :table_name
- AND a.TABLE_OWNER = :schema
- ORDER BY a.INDEX_NAME, a.COLUMN_POSITION
- """ % dict(dblink=dblink)
- rp = connection.execute(q,
- dict(table_name=self.denormalize_name(table_name),
- schema=self.denormalize_name(schema)))
+ ON a.index_name = b.index_name
+ AND a.table_owner = b.table_owner
+ AND a.table_name = b.table_name
+ WHERE a.table_name = :table_name
+ AND a.table_owner = :schema
+ ORDER BY a.index_name, a.column_position""" % {'dblink': dblink})
+ rp = connection.execute(q, table_name=self.denormalize_name(table_name),
+ schema=self.denormalize_name(schema))
indexes = []
last_index_name = None
pkeys = self.get_primary_keys(connection, table_name, schema,
def _get_constraint_data(self, connection, table_name, schema=None,
dblink='', **kw):
- rp = connection.execute("""SELECT
+ rp = connection.execute(
+ sql.text("""SELECT
ac.constraint_name,
ac.constraint_type,
loc.column_name AS local_column,
AND ac.r_owner = rem.owner(+)
AND ac.r_constraint_name = rem.constraint_name(+)
AND (rem.position IS NULL or loc.position=rem.position)
- ORDER BY ac.constraint_name, loc.position"""
-
- % {'dblink':dblink}, {'table_name' : table_name, 'owner' : schema})
+ ORDER BY ac.constraint_name, loc.position""" % {'dblink': dblink}),
+ table_name=table_name, owner=schema)
constraint_data = rp.fetchall()
return constraint_data
self._prepare_reflection_args(connection, view_name, schema,
resolve_synonyms, dblink,
info_cache=info_cache)
- s = """
+ s = sql.text("""
SELECT text FROM all_views
WHERE owner = :schema
AND view_name = :view_name
- """
+ """)
rp = connection.execute(s,
view_name=view_name, schema=schema).scalar()
if rp: