which is set as the escape character using the syntax
"x LIKE y ESCAPE '<somestring>'" [ticket:993],
[ticket:791]
-
+
+- oracle
+ - the "owner" keyword on Table is now deprecated, and is
+ exactly synonymous with the "schema" keyword. Tables
+ can now be reflected with alternate "owner" attributes,
+ explicitly stated on the Table object or not using
+ "schema".
+
+ - all of the "magic" searching for synonyms, DBLINKs etc.
+ during table reflection
+ are disabled by default unless you specify
+ "oracle_resolve_synonyms=True" on the Table object.
+ Resolving synonyms necessarily leads to some messy
+ guessing which we'd rather leave off by default.
+ When the flag is set, tables and related tables
+ will be resolved against synonyms in all cases, meaning
+ if a synonym exists for a particular table, reflection
+ will use it when reflecting related tables. This is
+ stickier behavior than before which is why it's
+ off by default.
+
+
+
- extensions
- The "synonym" function is now directly usable with
"declarative". Pass in the decorated property using the
cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':self._denormalize_name(sequence_name)})
return bool( cursor.fetchone() is not None )
- def _locate_owner_row(self, owner, name, rows, raiseerr=False):
- """return the row in the given list of rows which references the given table name and owner name."""
- if not rows:
- if raiseerr:
- raise exceptions.NoSuchTableError(name)
- else:
- return None
- else:
- if owner is not None:
- for row in rows:
- if owner.upper() in row[0]:
- return row
- else:
- if raiseerr:
- raise exceptions.AssertionError("Specified owner %s does not own table %s" % (owner, name))
- else:
- return None
- else:
- if len(rows)==1:
- return rows[0]
- else:
- if raiseerr:
- raise exceptions.AssertionError("There are multiple tables with name '%s' visible to the schema, you must specifiy owner" % name)
- else:
- return None
-
- def _resolve_table_owner(self, connection, name, table, dblink=''):
- """Locate the given table in the ``ALL_TAB_COLUMNS`` view,
- including searching for equivalent synonyms and dblinks.
- """
-
- c = connection.execute ("select distinct OWNER from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name" % {'dblink':dblink}, {'table_name':name})
- rows = c.fetchall()
- try:
- row = self._locate_owner_row(table.owner, name, rows, raiseerr=True)
- return name, row['OWNER'], ''
- except exceptions.SQLAlchemyError:
- # locate synonyms
- c = connection.execute ("""select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK
- from ALL_SYNONYMS%(dblink)s
- where SYNONYM_NAME = :synonym_name
- and (DB_LINK IS NOT NULL
- or ((TABLE_NAME, TABLE_OWNER) in
- (select TABLE_NAME, OWNER from ALL_TAB_COLUMNS%(dblink)s)))""" % {'dblink':dblink},
- {'synonym_name':name})
- rows = c.fetchall()
- row = self._locate_owner_row(table.owner, name, rows)
- if row is None:
- row = self._locate_owner_row("PUBLIC", name, rows)
-
- if row is not None:
- owner, name, dblink = row['TABLE_OWNER'], row['TABLE_NAME'], row['DB_LINK']
- if dblink:
- dblink = '@' + dblink
- if not owner:
- # re-resolve table owner using new dblink variable
- t1, owner, t2 = self._resolve_table_owner(connection, name, table, dblink=dblink)
- else:
- dblink = ''
- return name, owner, dblink
- raise
-
def _normalize_name(self, name):
if name is None:
return None
cursor = connection.execute(s,{'owner':self._denormalize_name(schema)})
return [self._normalize_name(row[0]) for row in cursor]
+ def _resolve_synonym(self, connection, desired_owner=None, desired_synonym=None, desired_table=None):
+ """search for a local synonym matching the given desired owner/name.
+
+ if desired_owner is None, attempts to locate a distinct owner.
+
+ returns the actual name, owner, dblink name, and synonym name if found.
+ """
+
+ sql = """select OWNER, TABLE_OWNER, TABLE_NAME, DB_LINK, SYNONYM_NAME
+ from ALL_SYNONYMS WHERE """
+
+ clauses = []
+ params = {}
+ if desired_synonym:
+ clauses.append("SYNONYM_NAME=:synonym_name")
+ params['synonym_name'] = desired_synonym
+ if desired_owner:
+ clauses.append("TABLE_OWNER=:desired_owner")
+ params['desired_owner'] = desired_owner
+ if desired_table:
+ clauses.append("TABLE_NAME=:tname")
+ params['tname'] = desired_table
+
+ sql += " AND ".join(clauses)
+
+ result = connection.execute(sql, **params)
+ if desired_owner:
+ row = result.fetchone()
+ if row:
+ return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
+ else:
+ return None, None, None, None
+ else:
+ rows = result.fetchall()
+ if len(rows) > 1:
+ raise exceptions.AssertionError("There are multiple tables with name '%s' visible to the schema, you must specify owner" % name)
+ elif len(rows) == 1:
+ row = rows[0]
+ return row['TABLE_NAME'], row['TABLE_OWNER'], row['DB_LINK'], row['SYNONYM_NAME']
+ else:
+ return None, None, None, None
+
def reflecttable(self, connection, table, include_columns):
preparer = self.identifier_preparer
- # search for table, including across synonyms and dblinks.
- # locate the actual name of the table, the real owner, and any dblink clause needed.
- actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table)
+ resolve_synonyms = table.kwargs.get('oracle_resolve_synonyms', False)
- c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})
+ if resolve_synonyms:
+ actual_name, owner, dblink, synonym = self._resolve_synonym(connection, desired_owner=self._denormalize_name(table.schema), desired_synonym=self._denormalize_name(table.name))
+ else:
+ actual_name, owner, dblink, synonym = None, None, None, None
+
+ if not actual_name:
+ actual_name = self._denormalize_name(table.name)
+ if not dblink:
+ dblink = ''
+ if not owner:
+ owner = self._denormalize_name(table.schema) or self.get_default_schema_name(connection)
+ c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})
while True:
row = c.fetchone()
"all_cons_columns%(dblink)s - does the user have "
"proper rights to the table?") % {'dblink':dblink})
continue
- refspec = ".".join([remote_table, remote_column])
- schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, owner=remote_owner)
+
+ if resolve_synonyms:
+ ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = self._resolve_synonym(connection, desired_owner=self._denormalize_name(remote_owner), desired_table=self._denormalize_name(remote_table))
+ if ref_synonym:
+ remote_table = self._normalize_name(ref_synonym)
+ remote_owner = self._normalize_name(ref_remote_owner)
+
+ if not table.schema and self._denormalize_name(remote_owner) == owner:
+ refspec = ".".join([remote_table, remote_column])
+ t = schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
+ else:
+ refspec = ".".join(x for x in [remote_owner, remote_table, remote_column] if x)
+ t = schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection, schema=remote_owner, oracle_resolve_synonyms=resolve_synonyms, useexisting=True)
+
if local_column not in fk[0]:
fk[0].append(local_column)
if refspec not in fk[1]:
"""A metaclass used by the ``Table`` object to provide singleton behavior."""
def __call__(self, name, metadata, *args, **kwargs):
- schema = kwargs.get('schema', None)
+ schema = kwargs.get('schema', kwargs.get('owner', None))
useexisting = kwargs.pop('useexisting', False)
mustexist = kwargs.pop('mustexist', False)
key = _get_table_key(name, schema)
constructor arguments.
owner
- Defaults to None: optional owning user of this table. useful for
- databases such as Oracle to aid in table reflection.
+ Deprecated; this is an oracle-only argument - "schema" should
+ be used in its place.
quote
Defaults to False: indicates that the Table identifier must be
super(Table, self).__init__(name)
self.metadata = metadata
- self.schema = kwargs.pop('schema', None)
- self.owner = kwargs.pop('owner', None)
+ self.schema = kwargs.pop('schema', kwargs.pop('owner', None))
self.indexes = util.Set()
self.constraints = util.Set()
self._columns = expression.ColumnCollection()
include_columns = kwargs.pop('include_columns', None)
self._set_parent(metadata)
+
+ self.__extra_kwargs(**kwargs)
+
# load column definitions from the database if 'autoload' is defined
# we do it after the table is in the singleton dictionary to support
# circular foreign keys
raise exceptions.ArgumentError(
"Can't change schema of existing table from '%s' to '%s'",
(self.schema, schema))
- owner = kwargs.pop('owner', None)
- if owner:
- if not self.owner:
- self.owner = owner
- elif owner != self.owner:
- raise exceptions.ArgumentError(
- "Can't change owner of existing table from '%s' to '%s'",
- (self.owner, owner))
include_columns = kwargs.pop('include_columns', None)
if include_columns:
for c in self.c:
if c.name not in include_columns:
self.c.remove(c)
+
+ self.__extra_kwargs(**kwargs)
self.__post_init(*args, **kwargs)
def _cant_override(self, *args, **kwargs):
return bool(args) or bool(util.Set(kwargs).difference(
['autoload', 'autoload_with', 'schema', 'owner']))
- def __post_init(self, *args, **kwargs):
+ def __extra_kwargs(self, **kwargs):
self.quote = kwargs.pop('quote', False)
self.quote_schema = kwargs.pop('quote_schema', False)
if kwargs.get('info'):
raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys()))
self.kwargs.update(kwargs)
+
+ def __post_init(self, *args, **kwargs):
self._init_items(*args)
def key(self):
class CompileTest(TestBase, AssertsCompiledSQL):
__dialect__ = oracle.OracleDialect()
+ def test_owner(self):
+ meta = MetaData()
+ parent = Table('parent', meta, Column('id', Integer, primary_key=True),
+ Column('name', String(50)),
+ owner='ed')
+ child = Table('child', meta, Column('id', Integer, primary_key=True),
+ Column('parent_id', Integer, ForeignKey('ed.parent.id')),
+ owner = 'ed')
+
+ self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON parent.id = child.parent_id")
+
def test_subquery(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t])
"ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id_1 ORDER BY addresses.rowid, "
"address_types.rowid")
+class SchemaReflectionTest(TestBase, AssertsCompiledSQL):
+ """instructions:
+
+ 1. create a user 'ed' in the oracle database.
+ 2. in 'ed', issue the following statements:
+ create table parent(id integer primary key, data varchar2(50));
+ create table child(id integer primary key, data varchar2(50), parent_id integer references parent(id));
+ create synonym ptable for parent;
+ create synonym ctable for child;
+ grant all on parent to scott; (or to whoever you run the oracle tests as)
+ grant all on child to scott; (same)
+ grant all on ptable to scott;
+ grant all on ctable to scott;
+
+ """
+
+ __only_on__ = 'oracle'
+
+ def test_reflect_alt_owner_explicit(self):
+ meta = MetaData(testing.db)
+ parent = Table('parent', meta, autoload=True, schema='ed')
+ child = Table('child', meta, autoload=True, schema='ed')
+
+ self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON parent.id = child.parent_id")
+ select([parent, child]).select_from(parent.join(child)).execute().fetchall()
+
+ def test_reflect_local_to_remote(self):
+ testing.db.execute("CREATE TABLE localtable (id INTEGER PRIMARY KEY, parent_id INTEGER REFERENCES ed.parent(id))")
+ try:
+ meta = MetaData(testing.db)
+ lcl = Table('localtable', meta, autoload=True)
+ parent = meta.tables['ed.parent']
+ self.assert_compile(parent.join(lcl), "ed.parent JOIN localtable ON parent.id = localtable.parent_id")
+ select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ finally:
+ testing.db.execute("DROP TABLE localtable")
+
+ def test_reflect_alt_owner_implicit(self):
+ meta = MetaData(testing.db)
+ parent = Table('parent', meta, autoload=True, schema='ed')
+ child = Table('child', meta, autoload=True, schema='ed')
+
+ self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON parent.id = child.parent_id")
+ select([parent, child]).select_from(parent.join(child)).execute().fetchall()
+
+ def test_reflect_alt_owner_synonyms(self):
+ testing.db.execute("CREATE TABLE localtable (id INTEGER PRIMARY KEY, parent_id INTEGER REFERENCES ed.ptable(id))")
+ try:
+ meta = MetaData(testing.db)
+ lcl = Table('localtable', meta, autoload=True, oracle_resolve_synonyms=True)
+ parent = meta.tables['ed.ptable']
+ self.assert_compile(parent.join(lcl), "ed.ptable JOIN localtable ON ptable.id = localtable.parent_id")
+ select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ finally:
+ testing.db.execute("DROP TABLE localtable")
+
+ def test_reflect_remote_synonyms(self):
+ meta = MetaData(testing.db)
+ parent = Table('ptable', meta, autoload=True, schema='ed', oracle_resolve_synonyms=True)
+ child = Table('ctable', meta, autoload=True, schema='ed', oracle_resolve_synonyms=True)
+ self.assert_compile(parent.join(child), "ed.ptable JOIN ed.ctable ON ptable.id = ctable.parent_id")
+ select([parent, child]).select_from(parent.join(child)).execute().fetchall()
+
+
class TypesTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'oracle'
'all_types', MetaData(testing.db),
Column('owner', String(30), primary_key=True),
Column('type_name', String(30), primary_key=True),
- autoload=True,
+ autoload=True, oracle_resolve_synonyms=True
)
[[row[k] for k in row.keys()] for row in types_table.select().execute().fetchall()]
return schema.Table(*args, **kw)
-generic_counter = itertools.count()
def Column(*args, **kw):
"""A schema.Column wrapper/hook for dialect-specific tweaks."""
if schema is None:
from sqlalchemy import schema
+ test_opts = dict([(k,kw.pop(k)) for k in kw.keys()
+ if k.startswith('test_')])
+
if testing.against('oracle'):
- if kw.get('primary_key') == True and kw.get('default') == None:
- kw['default'] = generic_counter.next
+ if 'test_needs_autoincrement' in test_opts:
+ args = list(args)
+ args.append(schema.Sequence(args[0], optional=True))
return schema.Column(*args, **kw)