examining the type of column for use in special Python translations or
for migrating schemas to other database backends.
+.. _oracle_table_options:
+
Oracle Table Options
-------------------------
.. versionadded:: 1.0.0
+* ``COMPRESS``::
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=True)
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=6)
+
+ The ``oracle_compress`` parameter accepts either an integer compression
+ level, or ``True`` to use the default compression level.
+
+.. versionadded:: 1.0.0
+
+.. _oracle_index_options:
+
+Oracle Specific Index Options
+-----------------------------
+
+Bitmap Indexes
+~~~~~~~~~~~~~~
+
+You can specify the ``oracle_bitmap`` parameter to create a bitmap index
+instead of a B-tree index::
+
+ Index('my_index', my_table.c.data, oracle_bitmap=True)
+
+Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
+check for such limitations, only the database will.
+
+.. versionadded:: 1.0.0
+
+Index compression
+~~~~~~~~~~~~~~~~~
+
+Oracle has a more efficient storage mode for indexes containing lots of
+repeated values. Use the ``oracle_compress`` parameter to turn on key c
+ompression::
+
+ Index('my_index', my_table.c.data, oracle_compress=True)
+
+ Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
+ oracle_compress=1)
+
+The ``oracle_compress`` parameter accepts either an integer specifying the
+number of prefix columns to compress, or ``True`` to use the default (all
+columns for non-unique indexes, all but the last column for unique indexes).
+
+.. versionadded:: 1.0.0
+
"""
import re
from sqlalchemy import util, sql
-from sqlalchemy.engine import default, base, reflection
+from sqlalchemy.engine import default, reflection
from sqlalchemy.sql import compiler, visitors, expression
-from sqlalchemy.sql import (operators as sql_operators,
- functions as sql_functions)
+from sqlalchemy.sql import operators as sql_operators
from sqlalchemy import types as sqltypes, schema as sa_schema
from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \
BLOB, CLOB, TIMESTAMP, FLOAT
return text
- def visit_create_index(self, create, **kw):
- return super(OracleDDLCompiler, self).\
- visit_create_index(create, include_schema=True)
+ def visit_create_index(self, create):
+ index = create.element
+ self._verify_index_table(index)
+ preparer = self.preparer
+ text = "CREATE "
+ if index.unique:
+ text += "UNIQUE "
+ if index.dialect_options['oracle']['bitmap']:
+ text += "BITMAP "
+ text += "INDEX %s ON %s (%s)" % (
+ self._prepared_index_name(index, include_schema=True),
+ preparer.format_table(index.table, use_schema=True),
+ ', '.join(
+ self.sql_compiler.process(
+ expr,
+ include_table=False, literal_binds=True)
+ for expr in index.expressions)
+ )
+ if index.dialect_options['oracle']['compress'] is not False:
+ if index.dialect_options['oracle']['compress'] is True:
+ text += " COMPRESS"
+ else:
+ text += " COMPRESS %d" % (
+ index.dialect_options['oracle']['compress']
+ )
+ return text
def post_create_table(self, table):
table_opts = []
on_commit_options = opts['on_commit'].replace("_", " ").upper()
table_opts.append('\n ON COMMIT %s' % on_commit_options)
+ if opts['compress']:
+ if opts['compress'] is True:
+ table_opts.append("\n COMPRESS")
+ else:
+ table_opts.append("\n COMPRESS FOR %s" % (
+ opts['compress']
+ ))
+
return ''.join(table_opts)
construct_arguments = [
(sa_schema.Table, {
"resolve_synonyms": False,
- "on_commit": None
+ "on_commit": None,
+ "compress": False
+ }),
+ (sa_schema.Index, {
+ "bitmap": False,
+ "compress": False
})
]
return self.server_version_info and \
self.server_version_info < (9, )
+ @property
+ def _supports_table_compression(self):
+ return self.server_version_info and \
+ self.server_version_info >= (9, 2, )
+
+ @property
+ def _supports_table_compress_for(self):
+ return self.server_version_info and \
+ self.server_version_info >= (11, )
+
@property
def _supports_char_length(self):
return not self._is_oracle_8
cursor = connection.execute(s, owner=self.denormalize_name(schema))
return [self.normalize_name(row[0]) for row in cursor]
+ @reflection.cache
+ def get_table_options(self, connection, table_name, schema=None, **kw):
+ options = {}
+
+ resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
+ dblink = kw.get('dblink', '')
+ info_cache = kw.get('info_cache')
+
+ (table_name, schema, dblink, synonym) = \
+ self._prepare_reflection_args(connection, table_name, schema,
+ resolve_synonyms, dblink,
+ info_cache=info_cache)
+
+ params = {"table_name": table_name}
+
+ columns = ["table_name"]
+ if self._supports_table_compression:
+ columns.append("compression")
+ if self._supports_table_compress_for:
+ columns.append("compress_for")
+
+ text = "SELECT %(columns)s "\
+ "FROM ALL_TABLES%(dblink)s "\
+ "WHERE table_name = :table_name"
+
+ if schema is not None:
+ params['owner'] = schema
+ text += " AND owner = :owner "
+ text = text % {'dblink': dblink, 'columns': ", ".join(columns)}
+
+ result = connection.execute(sql.text(text), **params)
+
+ enabled = dict(DISABLED=False, ENABLED=True)
+
+ row = result.first()
+ if row:
+ if "compression" in row and enabled.get(row.compression, False):
+ if "compress_for" in row:
+ options['oracle_compress'] = row.compress_for
+ else:
+ options['oracle_compress'] = True
+
+ return options
+
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
params = {'table_name': table_name}
text = \
- "SELECT a.index_name, a.column_name, b.uniqueness "\
+ "SELECT a.index_name, a.column_name, "\
+ "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "\
"\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
"\nALL_INDEXES%(dblink)s b "\
"\nWHERE "\
dblink=dblink, info_cache=kw.get('info_cache'))
pkeys = pk_constraint['constrained_columns']
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
+ enabled = dict(DISABLED=False, ENABLED=True)
oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
if rset.index_name != last_index_name:
remove_if_primary_key(index)
index = dict(name=self.normalize_name(rset.index_name),
- column_names=[])
+ column_names=[], dialect_options={})
indexes.append(index)
index['unique'] = uniqueness.get(rset.uniqueness, False)
+ if rset.index_type in ('BITMAP', 'FUNCTION-BASED BITMAP'):
+ index['dialect_options']['oracle_bitmap'] = True
+ if enabled.get(rset.compression, False):
+ index['dialect_options']['oracle_compress'] = rset.prefix_length
+
# filter out Oracle SYS_NC names. could also do an outer join
# to the all_tab_columns table and check for real col names there.
if not oracle_sys_col.match(rset.column_name):
)
+ def test_create_table_compress(self):
+ m = MetaData()
+ tbl1 = Table('testtbl1', m, Column('data', Integer),
+ oracle_compress=True)
+ tbl2 = Table('testtbl2', m, Column('data', Integer),
+ oracle_compress="OLTP")
+
+ self.assert_compile(schema.CreateTable(tbl1),
+ "CREATE TABLE testtbl1 (data INTEGER) COMPRESS")
+ self.assert_compile(schema.CreateTable(tbl2),
+ "CREATE TABLE testtbl2 (data INTEGER) "
+ "COMPRESS FOR OLTP")
+
+ def test_create_index_bitmap_compress(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', Integer))
+ idx1 = Index('idx1', tbl.c.data, oracle_compress=True)
+ idx2 = Index('idx2', tbl.c.data, oracle_compress=1)
+ idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True)
+
+ self.assert_compile(schema.CreateIndex(idx1),
+ "CREATE INDEX idx1 ON testtbl (data) COMPRESS")
+ self.assert_compile(schema.CreateIndex(idx2),
+ "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1")
+ self.assert_compile(schema.CreateIndex(idx3),
+ "CREATE BITMAP INDEX idx3 ON testtbl (data)")
+
+
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def _dialect(self, server_version, **kw):
m2 = MetaData(testing.db)
Table('test_index_reflect', m2, autoload=True)
+
+def all_tables_compression_missing():
+ try:
+ testing.db.execute('SELECT compression FROM all_tables')
+ return False
+ except:
+ return True
+
+
+def all_tables_compress_for_missing():
+ try:
+ testing.db.execute('SELECT compress_for FROM all_tables')
+ return False
+ except:
+ return True
+
+
+class TableReflectionTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compression_missing)
+ def test_reflect_basic_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress=True)
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ # Don't hardcode the exact value, but it must be non-empty
+ assert tbl.dialect_options['oracle']['compress']
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compress_for_missing)
+ def test_reflect_oltp_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress="OLTP")
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ assert tbl.dialect_options['oracle']['compress'] == "OLTP"
+
+
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = 'oracle'
# "group" is a keyword, so lower case
normalind = Index('tableind', table.c.id_b, table.c.group)
+ compress1 = Index('compress1', table.c.id_a, table.c.id_b,
+ oracle_compress=True)
+ compress2 = Index('compress2', table.c.id_a, table.c.id_b, table.c.col,
+ oracle_compress=1)
metadata.create_all()
mirror = MetaData(testing.db)
)
assert (Index, ('id_b', ), True) in reflected
assert (Index, ('col', 'group'), True) in reflected
+
+ idx = reflected[(Index, ('id_a', 'id_b', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 2
+
+ idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 1
+
eq_(len(reflectedtable.constraints), 1)
- eq_(len(reflectedtable.indexes), 3)
+ eq_(len(reflectedtable.indexes), 5)
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):