- mysql
- all the _detect_XXX() functions now run once underneath dialect.initialize()
+- oracle
+ - support for cx_Oracle's "native unicode" mode which does not require NLS_LANG
+ to be set. Use the latest 5.0.2 or later of cx_oracle.
+ - an NCLOB type is added to the base types.
+
- new dialects
- pg8000
- pyodbc+mysql
autoload=True
)
+Identifier Casing
+-----------------
+
+In Oracle, the data dictionary represents all case insensitive identifier names
+using UPPERCASE text. SQLAlchemy on the other hand considers an all-lower case identifier
+name to be case insensitive. The Oracle dialect converts all case insensitive identifiers
+to and from those two formats during schema level communication, such as reflection of
+tables and indexes. Using an UPPERCASE name on the SQLAlchemy side indicates a
+case sensitive identifier, and SQLAlchemy will quote the name - this will cause mismatches
+against data dictionary data received from Oracle, so unless identifier names have been
+truly created as case sensitive (i.e. using quoted names), all lowercase names should be
+used on the SQLAlchemy side.
+
+Unicode
+-------
+
+SQLAlchemy 0.6 uses the "native unicode" mode provided as of cx_oracle 5. cx_oracle 5.0.2
+or greater is recommended for support of NCLOB. If not using cx_oracle 5, the NLS_LANG
+environment variable needs to be set in order for the oracle client library to use
+proper encoding, such as "AMERICAN_AMERICA.UTF8".
+
+Also note that Oracle supports unicode data through the NVARCHAR and NCLOB data types.
+When using the SQLAlchemy Unicode and UnicodeText types, these DDL types will be used
+within CREATE TABLE statements. Usage of VARCHAR2 and CLOB with unicode text still
+requires NLS_LANG to be set.
+
LIMIT/OFFSET Support
--------------------
"""
-import datetime, random, re
+import random, re
from sqlalchemy import schema as sa_schema
from sqlalchemy import util, sql, log
RESERVED_WORDS = set('''SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR DECIMAL UNION PUBLIC AND START'''.split())
-class OracleDate(sqltypes.Date):
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect):
- def process(value):
- if not isinstance(value, datetime.datetime):
- return value
- else:
- return value.date()
- return process
-
-class OracleDateTime(sqltypes.DateTime):
- def result_processor(self, dialect):
- def process(value):
- if value is None or isinstance(value, datetime.datetime):
- return value
- else:
- # convert cx_oracle datetime object returned pre-python 2.4
- return datetime.datetime(value.year, value.month,
- value.day,value.hour, value.minute, value.second)
- return process
-
-# Note:
-# Oracle DATE == DATETIME
-# Oracle does not allow milliseconds in DATE
-# Oracle does not support TIME columns
-
-# only if cx_oracle contains TIMESTAMP
-class OracleTimestamp(sqltypes.TIMESTAMP):
- def result_processor(self, dialect):
- def process(value):
- if value is None or isinstance(value, datetime.datetime):
- return value
- else:
- # convert cx_oracle datetime object returned pre-python 2.4
- return datetime.datetime(value.year, value.month,
- value.day,value.hour, value.minute, value.second)
- return process
-
-class OracleText(sqltypes.Text):
- def get_dbapi_type(self, dbapi):
- return dbapi.CLOB
-
- def result_processor(self, dialect):
- super_process = super(OracleText, self).result_processor(dialect)
- if not dialect.auto_convert_lobs:
- return super_process
- lob = dialect.dbapi.LOB
- def process(value):
- if isinstance(value, lob):
- if super_process:
- return super_process(value.read())
- else:
- return value.read()
- else:
- if super_process:
- return super_process(value)
- else:
- return value
- return process
-
-
-class OracleBinary(sqltypes.Binary):
- def get_dbapi_type(self, dbapi):
- return dbapi.BLOB
-
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect):
- if not dialect.auto_convert_lobs:
- return None
- lob = dialect.dbapi.LOB
- def process(value):
- if isinstance(value, lob):
- return value.read()
- else:
- return value
- return process
-
-class OracleRaw(OracleBinary):
- def get_col_spec(self):
- return "RAW(%(length)s)" % {'length' : self.length}
+class OracleRaw(sqltypes.Binary):
+ pass
class OracleBoolean(sqltypes.Boolean):
def result_processor(self, dialect):
return process
colspecs = {
- sqltypes.DateTime : OracleDateTime,
- sqltypes.Date : OracleDate,
- sqltypes.Binary : OracleBinary,
sqltypes.Boolean : OracleBoolean,
- sqltypes.Text : OracleText,
- sqltypes.TIMESTAMP : OracleTimestamp,
}
ischema_names = {
def visit_datetime(self, type_):
return self.visit_DATE(type_)
+
+ def visit_float(self, type_):
+ return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': type_.precision, 'scale' : 2}
+
+ def visit_unicode(self, type_):
+ return self.visit_NVARCHAR(type_)
def visit_VARCHAR(self, type_):
return "VARCHAR(%(length)s)" % {'length' : type_.length}
def visit_text(self, type_):
return self.visit_CLOB(type_)
+ def visit_unicode_text(self, type_):
+ return self.visit_NCLOB(type_)
+
def visit_binary(self, type_):
return self.visit_BLOB(type_)
"""Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??"""
if asfrom:
- return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + self.preparer.format_alias(alias, self._anonymize(alias.name))
+ alias_name = isinstance(alias.name, expression._generated_label) and \
+ self._truncated_identifier("alias", alias.name) or alias.name
+
+ return self.process(alias.original, asfrom=asfrom, **kwargs) + " " + self.preparer.format_alias(alias, alias_name)
else:
return self.process(alias.original, **kwargs)
if coltype == 'NUMBER' :
if precision is None and scale is None:
coltype = sqltypes.NUMERIC
- elif precision is None and scale == 0 :
+ elif precision is None and scale == 0:
coltype = sqltypes.INTEGER
else :
coltype = sqltypes.NUMERIC(precision, scale)
if rset.column_name in [s.upper() for s in pkeys]:
continue
if rset.index_name != last_index_name:
- index = dict(name=rset.index_name, column_names=[])
+ index = dict(name=self._normalize_name(rset.index_name), column_names=[])
indexes.append(index)
index['unique'] = uniqueness.get(rset.uniqueness, False)
- index['column_names'].append(rset.column_name)
+ index['column_names'].append(self._normalize_name(rset.column_name))
last_index_name = rset.index_name
return indexes
"""
-from sqlalchemy.dialects.oracle.base import OracleDialect, OracleText, OracleBinary, OracleRaw, RESERVED_WORDS
+from sqlalchemy.dialects.oracle.base import OracleDialect, OracleRaw, OracleBoolean, RESERVED_WORDS
from sqlalchemy.engine.default import DefaultExecutionContext
from sqlalchemy.engine import base
from sqlalchemy import types as sqltypes, util
+import datetime
-class OracleNVarchar(sqltypes.NVARCHAR):
- """The SQL NVARCHAR type."""
+class OracleDate(sqltypes.Date):
+ def bind_processor(self, dialect):
+ return None
- def __init__(self, **kw):
- kw['convert_unicode'] = False # cx_oracle does this for us, for NVARCHAR2
- sqltypes.NVARCHAR.__init__(self, **kw)
+ def result_processor(self, dialect):
+ def process(value):
+ if not isinstance(value, datetime.datetime):
+ return value
+ else:
+ return value.date()
+ return process
+
+class OracleDateTime(sqltypes.DateTime):
+ def result_processor(self, dialect):
+ def process(value):
+ if value is None or isinstance(value, datetime.datetime):
+ return value
+ else:
+ # convert cx_oracle datetime object returned pre-python 2.4
+ return datetime.datetime(value.year, value.month,
+ value.day,value.hour, value.minute, value.second)
+ return process
+
+# Note:
+# Oracle DATE == DATETIME
+# Oracle does not allow milliseconds in DATE
+# Oracle does not support TIME columns
+
+# only if cx_oracle contains TIMESTAMP
+class OracleTimestamp(sqltypes.TIMESTAMP):
+ def result_processor(self, dialect):
+ def process(value):
+ if value is None or isinstance(value, datetime.datetime):
+ return value
+ else:
+ # convert cx_oracle datetime object returned pre-python 2.4
+ return datetime.datetime(value.year, value.month,
+ value.day,value.hour, value.minute, value.second)
+ return process
+
+class LOBMixin(object):
+ def result_processor(self, dialect):
+ super_process = super(LOBMixin, self).result_processor(dialect)
+ if not dialect.auto_convert_lobs:
+ return super_process
+ lob = dialect.dbapi.LOB
+ def process(value):
+ if isinstance(value, lob):
+ if super_process:
+ return super_process(value.read())
+ else:
+ return value.read()
+ else:
+ if super_process:
+ return super_process(value)
+ else:
+ return value
+ return process
+
+class OracleText(LOBMixin, sqltypes.Text):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.CLOB
+
+class OracleUnicodeText(LOBMixin, sqltypes.UnicodeText):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.NCLOB
+
+
+class OracleBinary(LOBMixin, sqltypes.Binary):
+ def get_dbapi_type(self, dbapi):
+ return dbapi.BLOB
+
+ def bind_processor(self, dialect):
+ return None
+
+
+class cxOracleRaw(LOBMixin, OracleRaw):
+ pass
+
+
+colspecs = {
+ sqltypes.DateTime : OracleDateTime,
+ sqltypes.Date : OracleDate,
+ sqltypes.Binary : OracleBinary,
+ sqltypes.Boolean : OracleBoolean,
+ sqltypes.Text : OracleText,
+ sqltypes.UnicodeText : OracleUnicodeText,
+ sqltypes.TIMESTAMP : OracleTimestamp,
+}
class Oracle_cx_oracleExecutionContext(DefaultExecutionContext):
def pre_exec(self):
class Oracle_cx_oracle(OracleDialect):
execution_ctx_cls = Oracle_cx_oracleExecutionContext
driver = "cx_oracle"
-
- colspecs = util.update_copy(
- OracleDialect.colspecs,
- {
- sqltypes.NVARCHAR:OracleNVarchar
- }
- )
+ colspecs = colspecs
def __init__(self,
auto_setinputsizes=True,
self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
self.auto_setinputsizes = auto_setinputsizes
self.auto_convert_lobs = auto_convert_lobs
+
+ def vers(num):
+ return tuple([int(x) for x in num.split('.')])
+
+ if hasattr(self.dbapi, 'version'):
+ cx_oracle_ver = vers(self.dbapi.version)
+ self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
+
if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__:
self.dbapi_type_map = {}
self.ORACLE_BINARY_TYPES = []
# expect encoded strings or unicodes, etc.
self.dbapi_type_map = {
self.dbapi.CLOB: OracleText(),
+ self.dbapi.NCLOB:OracleUnicodeText(),
self.dbapi.BLOB: OracleBinary(),
self.dbapi.BINARY: OracleRaw(),
}
def visit_CLOB(self, type_):
return "CLOB"
+ def visit_NCLOB(self, type_):
+ return "NCLOB"
+
def visit_VARCHAR(self, type_):
return "VARCHAR" + (type_.length and "(%d)" % type_.length or "")
def visit_integer(self, type_):
return self.visit_INTEGER(type_)
- def visit_float(self, type_):
+ def visit_float(self, type_):
return self.visit_FLOAT(type_)
def visit_numeric(self, type_):
def visit_string(self, type_):
return self.visit_VARCHAR(type_)
+ def visit_unicode(self, type_):
+ return self.visit_VARCHAR(type_)
+
def visit_text(self, type_):
return self.visit_TEXT(type_)
+
+ def visit_unicode_text(self, type_):
+ return self.visit_TEXT(type_)
def visit_null(self, type_):
raise NotImplementedError("Can't generate DDL for the null type")
and col.primary_key
and getattr(col, '_needs_autoincrement', False))]
for c in pk_seqs:
- c.args.append(schema.Sequence(args[0] + '_' + c.name + '_seq', optional=True))
+ c._init_items(schema.Sequence(args[0] + '_' + c.name + '_seq', optional=True))
return schema.Table(*args, **kw)
set(type(reflected_c.type).__mro__).difference(base_mro).intersection(
set(type(c.type).__mro__).difference(base_mro)
)
- ) > 0, "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type)
+ ) > 0, "On column %r, type '%s' doesn't correspond to type '%s'" % (reflected_c.name, reflected_c.type, c.type)
if isinstance(c.type, sqltypes.String):
eq_(c.type.length, reflected_c.type.length)
# ignore reflection of bogus db-generated DefaultClause()
pass
elif not c.primary_key or not against('postgres'):
- print repr(c)
+ #print repr(c)
assert reflected_c.default is None, reflected_c.default
assert len(table.primary_key) == len(reflected_table.primary_key)
"""
+ __visit_name__ = 'unicode'
+
def __init__(self, length=None, **kwargs):
"""
Create a Unicode-converting String type.
class UnicodeText(Text):
"""A synonym for Text(convert_unicode=True, assert_unicode='warn')."""
+ __visit_name__ = 'unicode_text'
+
def __init__(self, length=None, **kwargs):
"""
Create a Unicode-converting Text type.
__visit_name__ = 'CLOB'
+class NCLOB(Text):
+ """The SQL NCLOB type."""
+
+ __visit_name__ = 'NCLOB'
+
class VARCHAR(String):
"""The SQL VARCHAR type."""
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_
from sqlalchemy.test.engines import testing_engine
+from sqlalchemy.dialect.oracle import cx_oracle
import os
b = bindparam("foo", u"hello world!")
assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+ def test_timestamp_adapt(self):
+ dialect = oracle.OracleDialect()
+ t1 = types.DateTime
+ t3 = types.TIMESTAMP
+
+
+ assert isinstance(dialect.type_descriptor(t1), cx_oracle.OracleTimestamp)
+ assert isinstance(dialect.type_descriptor(t3), cx_oracle.OracleTimestamp)
+
+ def test_string_adapt(self):
+ oracle_dialect = oracle.OracleDialect()
+
+ for dialect, start, test in [
+ (oracle_dialect, String(), String),
+ (oracle_dialect, VARCHAR(), VARCHAR),
+ (oracle_dialect, String(50), String),
+ (oracle_dialect, Unicode(), Unicode),
+ (oracle_dialect, UnicodeText(), cx_oracle.OracleUnicodeText),
+ (oracle_dialect, NCHAR(), NCHAR),
+ (oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw),
+ ]:
+ assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
+
+
def test_reflect_raw(self):
types_table = Table(
'all_types', MetaData(testing.db),
global users, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
- Column('user_id', INT, primary_key = True),
+ Column('user_id', INT, primary_key = True, test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
import threading, time
-from sqlalchemy import pool, interfaces, create_engine
+from sqlalchemy import pool, interfaces, create_engine, select
import sqlalchemy as tsa
from sqlalchemy.test import TestBase, testing
from sqlalchemy.test.util import gc_collect, lazy_gc
listener.connect = listener
engine = create_engine(testing.db.url)
engine.pool.add_listener(listener)
- engine.execute('select 1')
+ engine.execute(select([1]))
assert called, "Listener not called on connect"
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('test3', sa.Text),
- Column('test4', sa.Numeric, nullable = False),
+ Column('test4', sa.Numeric(10, 2), nullable = False),
Column('test5', sa.Date),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('engine_users.user_id')),
Column('test8', sa.Binary),
Column('test_passivedefault2', sa.Integer, server_default='5'),
Column('test9', sa.Binary(100)),
- Column('test_numeric', sa.Numeric()),
+ Column('test10', sa.Numeric(10, 2)),
test_needs_fk=True,
)
m9.reflect()
self.assert_(not m9.tables)
- @testing.fails_on_everything_except('postgres', 'mysql', 'sqlite')
+ @testing.fails_on_everything_except('postgres', 'mysql', 'sqlite', 'oracle')
def test_index_reflection(self):
m1 = MetaData(testing.db)
t1 = Table('party', m1,
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('test3', sa.Text),
- Column('test4', sa.Numeric, nullable = False),
+ Column('test4', sa.Numeric(10, 2), nullable = False),
Column('test5', sa.DateTime),
Column('test5-1', sa.TIMESTAMP),
parent_user_id,
Column('test8', sa.Binary),
Column('test_passivedefault2', sa.Integer, server_default='5'),
Column('test9', sa.Binary(100)),
- Column('test_numeric', sa.Numeric()),
+ Column('test10', sa.Numeric(10, 2)),
schema=schema,
test_needs_fk=True,
)
insp = Inspector(meta.bind)
for (table_name, table) in zip(table_names, (users, addresses)):
schema_name = schema
- if schema and testing.against('oracle'):
- schema_name = schema.upper()
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
# should be in order
insp = Inspector(meta.bind)
indexes = insp.get_indexes('users', schema=schema)
indexes.sort()
- if testing.against('oracle'):
- expected_indexes = [
- {'unique': False,
- 'column_names': ['TEST1', 'TEST2'],
- 'name': 'USERS_T_IDX'}]
- else:
- expected_indexes = [
- {'unique': False,
- 'column_names': ['test1', 'test2'],
- 'name': 'users_t_idx'}]
+ expected_indexes = [
+ {'unique': False,
+ 'column_names': ['test1', 'test2'],
+ 'name': 'users_t_idx'}]
index_names = [d['name'] for d in indexes]
for e_index in expected_indexes:
assert e_index['name'] in index_names
], repr(result)
def test_table_alias_names(self):
- self.assert_compile(
- table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
- )
+ if testing.against('oracle'):
+ self.assert_compile(
+ table2.alias().select(),
+ "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs table_with_exactly_29_c_1"
+ )
+ else:
+ self.assert_compile(
+ table2.alias().select(),
+ "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
+ )
ta = table2.alias()
dialect = default.DefaultDialect()
class AdaptTest(TestBase):
def testmsnvarchar(self):
+ # TODO: migrate these tests to dialect modules
+
dialect = mssql.dialect()
# run the test twice to ensure the caching step works too
for x in range(0, 1):
assert isinstance(dialect_type, mssql.MSNVarchar)
eq_(dialect.type_compiler.process(dialect_type), 'NVARCHAR(10)')
- def testoracletimestamp(self):
- dialect = oracle.OracleDialect()
- t1 = oracle.OracleTimestamp
- t2 = oracle.OracleTimestamp()
- t3 = types.TIMESTAMP
- assert isinstance(dialect.type_descriptor(t1), oracle.OracleTimestamp)
- assert isinstance(dialect.type_descriptor(t2), oracle.OracleTimestamp)
- assert isinstance(dialect.type_descriptor(t3), oracle.OracleTimestamp)
-
def testmysqlbinary(self):
+ # TODO: migrate these tests to dialect modules
+
dialect = mysql.MySQLDialect()
t1 = mysql.MSVarBinary
t2 = mysql.MSVarBinary()
def teststringadapt(self):
"""test that String with no size becomes TEXT, *all* others stay as varchar/String"""
-
- oracle_dialect = oracle.OracleDialect()
+
+ # TODO: migrate these tests to dialect modules
+
mysql_dialect = mysql.MySQLDialect()
postgres_dialect = postgres.PGDialect()
firebird_dialect = firebird.FBDialect()
for dialect, start, test in [
- (oracle_dialect, String(), String),
- (oracle_dialect, VARCHAR(), VARCHAR),
- (oracle_dialect, String(50), String),
- (oracle_dialect, Unicode(), Unicode),
- (oracle_dialect, UnicodeText(), oracle.OracleText),
- (oracle_dialect, NCHAR(), NCHAR),
- (oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw),
(mysql_dialect, String(), mysql.MSString),
(mysql_dialect, VARCHAR(), mysql.MSString),
(mysql_dialect, String(50), mysql.MSString),