From: Mike Bayer Date: Sat, 20 Mar 2010 02:29:08 +0000 (-0400) Subject: oracle cleanup X-Git-Tag: rel_0_6beta2~4 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=7cf74e9ccaa928d3a5227f7a7846b0a25a7ab9b1;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git oracle cleanup --- diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index c6e9cea5dc..db1647679c 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -207,11 +207,19 @@ class OracleCompiler_cx_oracle(OracleCompiler): class OracleExecutionContext_cx_oracle(OracleExecutionContext): def pre_exec(self): - quoted_bind_names = getattr(self.compiled, '_quoted_bind_names', {}) + quoted_bind_names = \ + getattr(self.compiled, '_quoted_bind_names', None) if quoted_bind_names: + if not self.dialect.supports_unicode_binds: + quoted_bind_names = \ + dict( + (fromname, toname.encode(self.dialect.encoding)) + for fromname, toname in + quoted_bind_names.items() + ) for param in self.parameters: - for fromname, toname in self.compiled._quoted_bind_names.iteritems(): - param[toname.encode(self.dialect.encoding)] = param[fromname] + for fromname, toname in quoted_bind_names.items(): + param[toname] = param[fromname] del param[fromname] if self.dialect.auto_setinputsizes: @@ -219,14 +227,12 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): # on String, including that outparams/RETURNING # breaks for varchars self.set_input_sizes(quoted_bind_names, - exclude_types=self.dialect._cx_oracle_string_types + exclude_types=self.dialect._cx_oracle_string_types ) - + + # if a single execute, check for outparams if len(self.compiled_parameters) == 1: - for key in self.compiled.binds: - bindparam = self.compiled.binds[key] - name = self.compiled.bind_names[bindparam] - value = self.compiled_parameters[0][name] + for bindparam in self.compiled.binds.values(): if bindparam.isoutparam: dbtype = bindparam.type.dialect_impl(self.dialect).\ get_dbapi_type(self.dialect.dbapi) @@ -238,6 +244,7 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): " cx_oracle" % (name, bindparam.type) ) + name = self.compiled.bind_names[bindparam] self.out_parameters[name] = self.cursor.var(dbtype) self.parameters[0][quoted_bind_names.get(name, name)] = \ self.out_parameters[name] @@ -250,7 +257,10 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): def get_result_proxy(self): if hasattr(self, 'out_parameters') and self.compiled.returning: - returning_params = dict((k, v.getvalue()) for k, v in self.out_parameters.items()) + returning_params = dict( + (k, v.getvalue()) + for k, v in self.out_parameters.items() + ) return ReturningResultProxy(self, returning_params) result = None @@ -264,10 +274,11 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): result = base.ResultProxy(self) if hasattr(self, 'out_parameters'): - if self.compiled_parameters is not None and len(self.compiled_parameters) == 1: + if self.compiled_parameters is not None and \ + len(self.compiled_parameters) == 1: result.out_parameters = out_parameters = {} - for bind, name in self.compiled.bind_names.iteritems(): + for bind, name in self.compiled.bind_names.items(): if name in self.out_parameters: type = bind.type impl_type = type.dialect_impl(self.dialect) @@ -291,12 +302,14 @@ class OracleExecutionContext_cx_oracle(OracleExecutionContext): class OracleExecutionContext_cx_oracle_with_unicode(OracleExecutionContext_cx_oracle): """Support WITH_UNICODE in Python 2.xx. - WITH_UNICODE allows cx_Oracle's Python 3 unicode handling behavior under Python 2.x. - This mode in some cases disallows and in other cases silently - passes corrupted data when non-Python-unicode strings (a.k.a. plain old Python strings) - are passed as arguments to connect(), the statement sent to execute(), or any of the bind - parameter keys or values sent to execute(). This optional context - therefore ensures that all statements are passed as Python unicode objects. + WITH_UNICODE allows cx_Oracle's Python 3 unicode handling + behavior under Python 2.x. This mode in some cases disallows + and in other cases silently passes corrupted data when + non-Python-unicode strings (a.k.a. plain old Python strings) + are passed as arguments to connect(), the statement sent to execute(), + or any of the bind parameter keys or values sent to execute(). + This optional context therefore ensures that all statements are + passed as Python unicode objects. """ def __init__(self, *arg, **kw): @@ -373,17 +386,19 @@ class OracleDialect_cx_oracle(OracleDialect): if hasattr(self.dbapi, 'version'): cx_oracle_ver = tuple([int(x) for x in self.dbapi.version.split('.')]) - self.supports_unicode_binds = cx_oracle_ver >= (5, 0) - self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0) else: cx_oracle_ver = None def types(*names): - return set([getattr(self.dbapi, name, None) for name in names]).difference([None]) + return set([ + getattr(self.dbapi, name, None) for name in names + ]).difference([None]) self._cx_oracle_string_types = types("STRING", "UNICODE", "NCLOB", "CLOB") self._cx_oracle_unicode_types = types("UNICODE", "NCLOB") self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB") + self.supports_unicode_binds = cx_oracle_ver >= (5, 0) + self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0) if cx_oracle_ver is None: # this occurs in tests with mock DBAPIs diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 35cfd77b76..566a182ed4 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -33,8 +33,16 @@ create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT numb def test_out_params(self): result = testing.db.execute(text("begin foo(:x_in, :x_out, :y_out, :z_out); end;", - bindparams=[bindparam('x_in', Numeric), outparam('x_out', Integer), outparam('y_out', Numeric), outparam('z_out', String)]), x_in=5) - assert result.out_parameters == {'x_out':10, 'y_out':75, 'z_out':None}, result.out_parameters + bindparams=[ + bindparam('x_in', Numeric), + outparam('x_out', Integer), + outparam('y_out', Numeric), + outparam('z_out', String)]), + x_in=5) + eq_( + result.out_parameters, + {'x_out':10, 'y_out':75, 'z_out':None} + ) assert isinstance(result.out_parameters['x_out'], int) @classmethod @@ -54,7 +62,9 @@ class CompileTest(TestBase, AssertsCompiledSQL): Column('parent_id', Integer, ForeignKey('ed.parent.id')), schema = 'ed') - self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id") + self.assert_compile( + parent.join(child), + "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id") def test_subquery(self): t = table('sometable', column('col1'), column('col2')) @@ -62,7 +72,8 @@ class CompileTest(TestBase, AssertsCompiledSQL): s = select([s.c.col1, s.c.col2]) self.assert_compile(s, "SELECT col1, col2 FROM (SELECT " - "sometable.col1 AS col1, sometable.col2 AS col2 FROM sometable)") + "sometable.col1 AS col1, sometable.col2 " + "AS col2 FROM sometable)") def test_limit(self): t = table('sometable', column('col1'), column('col2')) @@ -73,7 +84,8 @@ class CompileTest(TestBase, AssertsCompiledSQL): s = select([t]).limit(10).offset(20) - self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " + self.assert_compile(s, + "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn " "FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 " "FROM sometable) WHERE ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1" ) @@ -128,15 +140,17 @@ class CompileTest(TestBase, AssertsCompiledSQL): anon = a_table.alias() self.assert_compile( - select([other_table, anon]).select_from( other_table.outerjoin(anon) ).apply_labels(), "SELECT other_thirty_characters_table_.id AS other_thirty_characters__1, " - "other_thirty_characters_table_.thirty_characters_table_id AS other_thirty_characters__2, " - "thirty_characters_table__1.id AS thirty_characters_table__3 FROM other_thirty_characters_table_ " - "LEFT OUTER JOIN thirty_characters_table_xxxxxx AS thirty_characters_table__1 ON " - "thirty_characters_table__1.id = other_thirty_characters_table_.thirty_characters_table_id", + "other_thirty_characters_table_.thirty_characters_table_id AS " + "other_thirty_characters__2, " + "thirty_characters_table__1.id AS thirty_characters_table__3 FROM " + "other_thirty_characters_table_ " + "LEFT OUTER JOIN thirty_characters_table_xxxxxx AS thirty_characters_table__1 " + "ON thirty_characters_table__1.id = " + "other_thirty_characters_table_.thirty_characters_table_id", dialect=dialect ) self.assert_compile( @@ -145,10 +159,13 @@ class CompileTest(TestBase, AssertsCompiledSQL): other_table.outerjoin(anon) ).apply_labels(), "SELECT other_thirty_characters_table_.id AS other_thirty_characters__1, " - "other_thirty_characters_table_.thirty_characters_table_id AS other_thirty_characters__2, " - "thirty_characters_table__1.id AS thirty_characters_table__3 FROM other_thirty_characters_table_ " + "other_thirty_characters_table_.thirty_characters_table_id AS " + "other_thirty_characters__2, " + "thirty_characters_table__1.id AS thirty_characters_table__3 FROM " + "other_thirty_characters_table_ " "LEFT OUTER JOIN thirty_characters_table_xxxxxx thirty_characters_table__1 ON " - "thirty_characters_table__1.id = other_thirty_characters_table_.thirty_characters_table_id", + "thirty_characters_table__1.id = " + "other_thirty_characters_table_.thirty_characters_table_id", dialect=ora_dialect ) @@ -185,57 +202,84 @@ class CompileTest(TestBase, AssertsCompiledSQL): "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, " "myothertable.othername FROM mytable, myothertable WHERE " "(mytable.name = :name_1 OR mytable.myid = :myid_1 OR " - "myothertable.othername != :othername_1 OR EXISTS (select yay from foo where boo = lar)) " + "myothertable.othername != :othername_1 OR EXISTS (select yay " + "from foo where boo = lar)) " "AND mytable.myid = myothertable.otherid(+)", dialect=oracle.OracleDialect(use_ansi = False)) - query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) - self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff " - "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER " - "JOIN thirdtable ON thirdtable.userid = myothertable.otherid") - self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM " - "mytable, myothertable, thirdtable WHERE thirdtable.userid(+) = myothertable.otherid AND " - "mytable.myid = myothertable.otherid(+)", dialect=oracle.dialect(use_ansi=False)) - - query = table1.join(table2, table1.c.myid==table2.c.otherid).join(table3, table3.c.userid==table2.c.otherid) - self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM " - "mytable, myothertable, thirdtable WHERE thirdtable.userid = myothertable.otherid AND " + query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).\ + outerjoin(table3, table3.c.userid==table2.c.otherid) + self.assert_compile(query.select(), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername, thirdtable.userid," + " thirdtable.otherstuff " + "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid =" + " myothertable.otherid LEFT OUTER " + "JOIN thirdtable ON thirdtable.userid = myothertable.otherid") + + self.assert_compile(query.select(), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername, thirdtable.userid," + " thirdtable.otherstuff FROM " + "mytable, myothertable, thirdtable WHERE thirdtable.userid(+) =" + " myothertable.otherid AND " + "mytable.myid = myothertable.otherid(+)", + dialect=oracle.dialect(use_ansi=False)) + + query = table1.join(table2, table1.c.myid==table2.c.otherid).\ + join(table3, table3.c.userid==table2.c.otherid) + self.assert_compile(query.select(), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername, thirdtable.userid, " + "thirdtable.otherstuff FROM " + "mytable, myothertable, thirdtable WHERE thirdtable.userid = " + "myothertable.otherid AND " "mytable.myid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False)) - query = table1.join(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) + query = table1.join(table2, table1.c.myid==table2.c.otherid).\ + outerjoin(table3, table3.c.userid==table2.c.otherid) self.assert_compile(query.select().order_by(table1.c.name).limit(10).offset(5), "SELECT myid, name, description, otherid, othername, userid, " "otherstuff FROM (SELECT myid, name, description, " - "otherid, othername, userid, otherstuff, ROWNUM AS ora_rn FROM (SELECT " - "mytable.myid AS myid, mytable.name AS name, mytable.description AS description, " - "myothertable.otherid AS otherid, myothertable.othername AS othername, " - "thirdtable.userid AS userid, thirdtable.otherstuff AS otherstuff FROM mytable, " - "myothertable, thirdtable WHERE thirdtable.userid(+) = myothertable.otherid AND " - "mytable.myid = myothertable.otherid ORDER BY mytable.name) WHERE " - "ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1", dialect=oracle.dialect(use_ansi=False)) + "otherid, othername, userid, otherstuff, " + "ROWNUM AS ora_rn FROM (SELECT " + "mytable.myid AS myid, mytable.name AS name, " + "mytable.description AS description, " + "myothertable.otherid AS otherid, myothertable.othername " + "AS othername, " + "thirdtable.userid AS userid, thirdtable.otherstuff AS " + "otherstuff FROM mytable, " + "myothertable, thirdtable WHERE thirdtable.userid(+) = " + "myothertable.otherid AND " + "mytable.myid = myothertable.otherid ORDER BY " + "mytable.name) WHERE " + "ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1", + dialect=oracle.dialect(use_ansi=False)) subq = select([table1]).\ - select_from(table1.outerjoin(table2, table1.c.myid==table2.c.otherid)).alias() - q = select([table3]).select_from(table3.outerjoin(subq, table3.c.userid==subq.c.myid)) + select_from( + table1.outerjoin(table2, table1.c.myid==table2.c.otherid) + ).alias() + q = select([table3]).select_from( + table3.outerjoin(subq, table3.c.userid==subq.c.myid) + ) self.assert_compile(q, "SELECT thirdtable.userid, thirdtable.otherstuff " - "FROM thirdtable LEFT OUTER JOIN (SELECT mytable.myid AS myid, mytable.name" + "FROM thirdtable LEFT OUTER JOIN (SELECT mytable.myid AS " + "myid, mytable.name" " AS name, mytable.description AS description " "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = " "myothertable.otherid) anon_1 ON thirdtable.userid = anon_1.myid", - dialect=oracle.dialect(use_ansi=True)) + dialect=oracle.dialect(use_ansi=True)) self.assert_compile(q, "SELECT thirdtable.userid, thirdtable.otherstuff " - "FROM thirdtable, (SELECT mytable.myid AS myid, mytable.name AS name, " - "mytable.description AS description FROM mytable, myothertable " - "WHERE mytable.myid = myothertable.otherid(+)) anon_1 " - "WHERE thirdtable.userid = anon_1.myid(+)", - dialect=oracle.dialect(use_ansi=False)) + "FROM thirdtable, (SELECT mytable.myid AS myid, mytable.name AS name, " + "mytable.description AS description FROM mytable, myothertable " + "WHERE mytable.myid = myothertable.otherid(+)) anon_1 " + "WHERE thirdtable.userid = anon_1.myid(+)", + dialect=oracle.dialect(use_ansi=False)) def test_alias_outer_join(self): address_types = table('address_types', @@ -251,13 +295,20 @@ class CompileTest(TestBase, AssertsCompiledSQL): at_alias = address_types.alias() s = select([at_alias, addresses]).\ - select_from(addresses.outerjoin(at_alias, addresses.c.address_type_id==at_alias.c.id)).\ + select_from( + addresses.outerjoin(at_alias, + addresses.c.address_type_id==at_alias.c.id) + ).\ where(addresses.c.user_id==7).\ order_by(addresses.c.id, address_types.c.id) - self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, " - "addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 " - "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :user_id_1 ORDER BY addresses.id, " - "address_types.id") + self.assert_compile(s, + "SELECT address_types_1.id, address_types_1.name, addresses.id, " + "addresses.user_id, " + "addresses.address_type_id, addresses.email_address FROM addresses " + "LEFT OUTER JOIN address_types address_types_1 " + "ON addresses.address_type_id = address_types_1.id WHERE " + "addresses.user_id = :user_id_1 ORDER BY addresses.id, " + "address_types.id") def test_compound(self): t1 = table('t1', column('c1'), column('c2'), column('c3'), ) @@ -295,8 +346,10 @@ create table test_schema.child( create synonym test_schema.ptable for test_schema.parent; create synonym test_schema.ctable for test_schema.child; --- can't make a ref from local schema to the remote schema's table without this, --- *and* cant give yourself a grant ! so we give it to public. ideas welcome. +-- can't make a ref from local schema to the +-- remote schema's table without this, +-- *and* cant give yourself a grant ! +-- so we give it to public. ideas welcome. grant references on test_schema.parent to public; grant references on test_schema.child to public; """.split(";"): @@ -357,8 +410,12 @@ drop synonym test_schema.ptable; parent = Table('parent', meta, autoload=True, schema='test_schema') child = Table('child', meta, autoload=True, schema='test_schema') - self.assert_compile(parent.join(child), "test_schema.parent JOIN test_schema.child ON test_schema.parent.id = test_schema.child.parent_id") - select([parent, child]).select_from(parent.join(child)).execute().fetchall() + self.assert_compile(parent.join(child), + "test_schema.parent JOIN test_schema.child ON " + "test_schema.parent.id = test_schema.child.parent_id") + select([parent, child]).\ + select_from(parent.join(child)).\ + execute().fetchall() def test_reflect_local_to_remote(self): testing.db.execute("CREATE TABLE localtable " @@ -368,8 +425,12 @@ drop synonym test_schema.ptable; meta = MetaData(testing.db) lcl = Table('localtable', meta, autoload=True) parent = meta.tables['test_schema.parent'] - self.assert_compile(parent.join(lcl), "test_schema.parent JOIN localtable ON test_schema.parent.id = localtable.parent_id") - select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall() + self.assert_compile(parent.join(lcl), + "test_schema.parent JOIN localtable ON " + "test_schema.parent.id = localtable.parent_id") + select([parent, lcl]).\ + select_from(parent.join(lcl)).\ + execute().fetchall() finally: testing.db.execute("DROP TABLE localtable") @@ -378,7 +439,9 @@ drop synonym test_schema.ptable; parent = Table('parent', meta, autoload=True, schema='test_schema') child = Table('child', meta, autoload=True, schema='test_schema') - self.assert_compile(parent.join(child), "test_schema.parent JOIN test_schema.child ON test_schema.parent.id = test_schema.child.parent_id") + self.assert_compile(parent.join(child), + "test_schema.parent JOIN test_schema.child ON " + "test_schema.parent.id = test_schema.child.parent_id") select([parent, child]).select_from(parent.join(child)).execute().fetchall() def test_reflect_alt_owner_synonyms(self): @@ -389,44 +452,54 @@ drop synonym test_schema.ptable; meta = MetaData(testing.db) lcl = Table('localtable', meta, autoload=True, oracle_resolve_synonyms=True) parent = meta.tables['test_schema.ptable'] - self.assert_compile(parent.join(lcl), "test_schema.ptable JOIN localtable ON test_schema.ptable.id = localtable.parent_id") + self.assert_compile(parent.join(lcl), + "test_schema.ptable JOIN localtable ON " + "test_schema.ptable.id = localtable.parent_id") select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall() finally: testing.db.execute("DROP TABLE localtable") def test_reflect_remote_synonyms(self): meta = MetaData(testing.db) - parent = Table('ptable', meta, autoload=True, schema='test_schema', oracle_resolve_synonyms=True) - child = Table('ctable', meta, autoload=True, schema='test_schema', oracle_resolve_synonyms=True) - self.assert_compile(parent.join(child), "test_schema.ptable JOIN test_schema.ctable ON test_schema.ptable.id = test_schema.ctable.parent_id") + parent = Table('ptable', meta, autoload=True, + schema='test_schema', + oracle_resolve_synonyms=True) + child = Table('ctable', meta, autoload=True, + schema='test_schema', + oracle_resolve_synonyms=True) + self.assert_compile(parent.join(child), + "test_schema.ptable JOIN test_schema.ctable ON " + "test_schema.ptable.id = test_schema.ctable.parent_id") select([parent, child]).select_from(parent.join(child)).execute().fetchall() class ConstraintTest(TestBase): __only_on__ = 'oracle' - def test_oracle_has_no_on_update_cascade(self): - m = MetaData(testing.db) + def setup(self): + global metadata + metadata = MetaData(testing.db) - foo = Table('foo', m, + foo = Table('foo', metadata, Column('id', Integer, primary_key=True), ) foo.create(checkfirst=True) - try: - bar = Table('bar', m, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer, ForeignKey('foo.id', onupdate="CASCADE")) - ) - assert_raises(exc.SAWarning, bar.create) + + def teardown(self): + metadata.drop_all() - bat = Table('bat', m, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer), - ForeignKeyConstraint(['foo_id'], ['foo.id'], onupdate="CASCADE") - ) - assert_raises(exc.SAWarning, bat.create) - - finally: - m.drop_all() + def test_oracle_has_no_on_update_cascade(self): + bar = Table('bar', metadata, + Column('id', Integer, primary_key=True), + Column('foo_id', Integer, ForeignKey('foo.id', onupdate="CASCADE")) + ) + assert_raises(exc.SAWarning, bar.create) + + bat = Table('bat', metadata, + Column('id', Integer, primary_key=True), + Column('foo_id', Integer), + ForeignKeyConstraint(['foo_id'], ['foo.id'], onupdate="CASCADE") + ) + assert_raises(exc.SAWarning, bat.create) class TypesTest(TestBase, AssertsCompiledSQL): @@ -434,8 +507,9 @@ class TypesTest(TestBase, AssertsCompiledSQL): __dialect__ = oracle.OracleDialect() def test_no_clobs_for_string_params(self): - """test that simple string params get a DBAPI type of VARCHAR, not CLOB. - this is to prevent setinputsizes from setting up cx_oracle.CLOBs on + """test that simple string params get a DBAPI type of + VARCHAR, not CLOB. This is to prevent setinputsizes + from setting up cx_oracle.CLOBs on string-based bind params [ticket:793].""" class FakeDBAPI(object): @@ -497,7 +571,8 @@ class TypesTest(TestBase, AssertsCompiledSQL): (NCHAR(), cx_oracle._OracleNVarChar), (oracle.RAW(50), cx_oracle._OracleRaw), ]: - assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect)) + assert isinstance(start.dialect_impl(dialect), test), \ + "wanted %r got %r" % (test, start.dialect_impl(dialect)) @testing.requires.returning def test_int_not_float(self): @@ -516,14 +591,24 @@ class TypesTest(TestBase, AssertsCompiledSQL): finally: t1.drop() - @testing.fails_on('+zxjdbc', 'Not yet known how to pass values of the INTERVAL type') + @testing.fails_on('+zxjdbc', + 'Not yet known how to pass values of the INTERVAL type') def test_interval(self): for type_, expected in [ (oracle.INTERVAL(), "INTERVAL DAY TO SECOND"), - (oracle.INTERVAL(day_precision=3), "INTERVAL DAY(3) TO SECOND"), - (oracle.INTERVAL(second_precision=5), "INTERVAL DAY TO SECOND(5)"), - (oracle.INTERVAL(day_precision=2, second_precision=5), "INTERVAL DAY(2) TO SECOND(5)"), + ( + oracle.INTERVAL(day_precision=3), + "INTERVAL DAY(3) TO SECOND" + ), + ( + oracle.INTERVAL(second_precision=5), + "INTERVAL DAY TO SECOND(5)" + ), + ( + oracle.INTERVAL(day_precision=2, second_precision=5), + "INTERVAL DAY(2) TO SECOND(5)" + ), ]: self.assert_compile(type_, expected) @@ -617,13 +702,13 @@ class TypesTest(TestBase, AssertsCompiledSQL): metadata.drop_all() def test_reflect_raw(self): - types_table = Table( - 'all_types', MetaData(testing.db), + types_table = Table('all_types', MetaData(testing.db), Column('owner', String(30), primary_key=True), Column('type_name', String(30), primary_key=True), autoload=True, oracle_resolve_synonyms=True ) - [[row[k] for k in row.keys()] for row in types_table.select().execute().fetchall()] + for row in types_table.select().execute().fetchall(): + [row[k] for k in row.keys()] def test_reflect_nvarchar(self): metadata = MetaData(testing.db) @@ -675,7 +760,9 @@ class TypesTest(TestBase, AssertsCompiledSQL): Column('data', Text), Column('bindata', LargeBinary)) t.create(engine) try: - engine.execute(t.insert(), id=1, data='this is text', bindata='this is binary') + engine.execute(t.insert(), id=1, + data='this is text', + bindata='this is binary') row = engine.execute(t.select()).first() eq_(row['data'].read(), 'this is text') eq_(row['bindata'].read(), 'this is binary') @@ -725,7 +812,9 @@ class BufferedColumnTest(TestBase, AssertsCompiledSQL): Column('data', LargeBinary) ) meta.create_all() - stream = os.path.join(os.path.dirname(__file__), "..", 'binary_data_one.dat') + stream = os.path.join( + os.path.dirname(__file__), "..", + 'binary_data_one.dat') stream = file(stream).read(12000) for i in range(1, 11): @@ -751,7 +840,9 @@ class UnsupportedIndexReflectTest(TestBase): def setup(self): global metadata metadata = MetaData(testing.db) - t1 = Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True)) + t1 = Table('test_index_reflect', metadata, + Column('data', String(20), primary_key=True) + ) metadata.create_all() def teardown(self):