"""Support for the Oracle database via the zxjdbc JDBC connector."""
+import decimal
import re
+try:
+ from com.ziclix.python.sql.handler import OracleDataHandler
+except ImportError:
+ OracleDataHandler = None
+
+from sqlalchemy import types as sqltypes, util
from sqlalchemy.connectors.zxJDBC import ZxJDBCConnector
from sqlalchemy.dialects.oracle.base import OracleDialect
+from sqlalchemy.engine.default import DefaultExecutionContext
-class Oracle_jdbc(ZxJDBCConnector, OracleDialect):
+class _JDBCDate(sqltypes.Date):
+
+ def result_processor(self, dialect):
+ def process(value):
+ if value is None:
+ return None
+ else:
+ return value.date()
+ return process
+
+
+class _JDBCNumeric(sqltypes.Numeric):
+
+ def result_processor(self, dialect):
+ if self.asdecimal:
+ return None
+ else:
+ def process(value):
+ if isinstance(value, decimal.Decimal):
+ return float(value)
+ else:
+ return value
+ return process
+
+
+class Oracle_jdbcExecutionContext(DefaultExecutionContext):
+ def create_cursor(self):
+ cursor = self._connection.connection.cursor()
+ cursor.cursor.datahandler = OracleDataHandler(cursor.cursor.datahandler)
+ return cursor
+
+
+class Oracle_jdbc(ZxJDBCConnector, OracleDialect):
+ execution_ctx_cls = Oracle_jdbcExecutionContext
jdbc_db_name = 'oracle'
jdbc_driver_name = 'oracle.jdbc.driver.OracleDriver'
+ implicit_returning = False
+
+ colspecs = util.update_copy(
+ OracleDialect.colspecs,
+ {
+ sqltypes.Date : _JDBCDate,
+ sqltypes.Numeric: _JDBCNumeric
+ }
+ )
+
+ def initialize(self, connection):
+ super(Oracle_jdbc, self).initialize(connection)
+ self.implicit_returning = False
+
def create_connect_args(self, url):
hostname = url.host
port = url.port or '1521'
jdbc_url = 'jdbc:oracle:thin:@%s:%s:%s' % (hostname, port, dbname)
return [[jdbc_url, url.username, url.password, self.jdbc_driver_name],
self._driver_kwargs()]
-
+
def _get_server_version_info(self, connection):
version = re.search(r'Release ([\d\.]+)', connection.connection.dbversion).group(1)
return tuple(int(x) for x in version.split('.'))
-
+
dialect = Oracle_jdbc
no_support('maxdb', 'not supported by database'),
no_support('sybase', 'not supported by database'),
no_support('informix', 'not supported by database'),
+ no_support('oracle+zxjdbc', 'FIXME: tricky; currently broken'),
)
def two_phase_transactions(fn):
class OutParamTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'oracle'
+ __only_on__ = 'oracle+cx_oracle'
@classmethod
def setup_class(cls):
]:
assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
+ @testing.requires.returning
def test_int_not_float(self):
m = MetaData(testing.db)
t1 = Table('t1', m, Column('foo', Integer))
finally:
testing.db.execute("DROP TABLE Z_TEST")
+ @testing.fails_on('+zxjdbc', 'auto_convert_lobs not applicable')
def test_raw_lobs(self):
engine = testing_engine(options=dict(auto_convert_lobs=False))
metadata = MetaData()
ret[c.key] = row[c]
return ret
- if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ if (testing.against('firebird', 'postgresql', 'oracle', 'mssql') and
+ not testing.against('oracle+zxjdbc')):
test_engines = [
engines.testing_engine(options={'implicit_returning':False}),
engines.testing_engine(options={'implicit_returning':True}),
eq_(r.inserted_primary_key, [12, 1])
def test_autoclose_on_insert(self):
- if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ if (testing.against('firebird', 'postgresql', 'oracle', 'mssql') and
+ not testing.against('oracle+zxjdbc')):
test_engines = [
engines.testing_engine(options={'implicit_returning':False}),
engines.testing_engine(options={'implicit_returning':True}),
from sqlalchemy.types import TypeDecorator
class ReturningTest(TestBase, AssertsExecutionResults):
- __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')
+ __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'oracle+zxjdbc')
def setup(self):
meta = MetaData(testing.db)
eq_(result2.fetchall(), [(2,False),])
class SequenceReturningTest(TestBase):
- __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'mssql')
+ __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'mssql', 'oracle+zxjdbc')
def setup(self):
meta = MetaData(testing.db)
class KeyReturningTest(TestBase, AssertsExecutionResults):
"""test returning() works with columns that define 'key'."""
- __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')
+ __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'oracle+zxjdbc')
def setup(self):
meta = MetaData(testing.db)