class OracleExecutionContext_cx_oracle(OracleExecutionContext):
def pre_exec(self):
- quoted_bind_names = getattr(self.compiled, '_quoted_bind_names', {})
+ quoted_bind_names = \
+ getattr(self.compiled, '_quoted_bind_names', None)
if quoted_bind_names:
+ if not self.dialect.supports_unicode_binds:
+ quoted_bind_names = \
+ dict(
+ (fromname, toname.encode(self.dialect.encoding))
+ for fromname, toname in
+ quoted_bind_names.items()
+ )
for param in self.parameters:
- for fromname, toname in self.compiled._quoted_bind_names.iteritems():
- param[toname.encode(self.dialect.encoding)] = param[fromname]
+ for fromname, toname in quoted_bind_names.items():
+ param[toname] = param[fromname]
del param[fromname]
if self.dialect.auto_setinputsizes:
# on String, including that outparams/RETURNING
# breaks for varchars
self.set_input_sizes(quoted_bind_names,
- exclude_types=self.dialect._cx_oracle_string_types
+ exclude_types=self.dialect._cx_oracle_string_types
)
-
+
+ # if a single execute, check for outparams
if len(self.compiled_parameters) == 1:
- for key in self.compiled.binds:
- bindparam = self.compiled.binds[key]
- name = self.compiled.bind_names[bindparam]
- value = self.compiled_parameters[0][name]
+ for bindparam in self.compiled.binds.values():
if bindparam.isoutparam:
dbtype = bindparam.type.dialect_impl(self.dialect).\
get_dbapi_type(self.dialect.dbapi)
" cx_oracle" %
(name, bindparam.type)
)
+ name = self.compiled.bind_names[bindparam]
self.out_parameters[name] = self.cursor.var(dbtype)
self.parameters[0][quoted_bind_names.get(name, name)] = \
self.out_parameters[name]
def get_result_proxy(self):
if hasattr(self, 'out_parameters') and self.compiled.returning:
- returning_params = dict((k, v.getvalue()) for k, v in self.out_parameters.items())
+ returning_params = dict(
+ (k, v.getvalue())
+ for k, v in self.out_parameters.items()
+ )
return ReturningResultProxy(self, returning_params)
result = None
result = base.ResultProxy(self)
if hasattr(self, 'out_parameters'):
- if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
+ if self.compiled_parameters is not None and \
+ len(self.compiled_parameters) == 1:
result.out_parameters = out_parameters = {}
- for bind, name in self.compiled.bind_names.iteritems():
+ for bind, name in self.compiled.bind_names.items():
if name in self.out_parameters:
type = bind.type
impl_type = type.dialect_impl(self.dialect)
class OracleExecutionContext_cx_oracle_with_unicode(OracleExecutionContext_cx_oracle):
"""Support WITH_UNICODE in Python 2.xx.
- WITH_UNICODE allows cx_Oracle's Python 3 unicode handling behavior under Python 2.x.
- This mode in some cases disallows and in other cases silently
- passes corrupted data when non-Python-unicode strings (a.k.a. plain old Python strings)
- are passed as arguments to connect(), the statement sent to execute(), or any of the bind
- parameter keys or values sent to execute(). This optional context
- therefore ensures that all statements are passed as Python unicode objects.
+ WITH_UNICODE allows cx_Oracle's Python 3 unicode handling
+ behavior under Python 2.x. This mode in some cases disallows
+ and in other cases silently passes corrupted data when
+ non-Python-unicode strings (a.k.a. plain old Python strings)
+ are passed as arguments to connect(), the statement sent to execute(),
+ or any of the bind parameter keys or values sent to execute().
+ This optional context therefore ensures that all statements are
+ passed as Python unicode objects.
"""
def __init__(self, *arg, **kw):
if hasattr(self.dbapi, 'version'):
cx_oracle_ver = tuple([int(x) for x in self.dbapi.version.split('.')])
- self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
- self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0)
else:
cx_oracle_ver = None
def types(*names):
- return set([getattr(self.dbapi, name, None) for name in names]).difference([None])
+ return set([
+ getattr(self.dbapi, name, None) for name in names
+ ]).difference([None])
self._cx_oracle_string_types = types("STRING", "UNICODE", "NCLOB", "CLOB")
self._cx_oracle_unicode_types = types("UNICODE", "NCLOB")
self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB")
+ self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
+ self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0)
if cx_oracle_ver is None:
# this occurs in tests with mock DBAPIs
def test_out_params(self):
result = testing.db.execute(text("begin foo(:x_in, :x_out, :y_out, :z_out); end;",
- bindparams=[bindparam('x_in', Numeric), outparam('x_out', Integer), outparam('y_out', Numeric), outparam('z_out', String)]), x_in=5)
- assert result.out_parameters == {'x_out':10, 'y_out':75, 'z_out':None}, result.out_parameters
+ bindparams=[
+ bindparam('x_in', Numeric),
+ outparam('x_out', Integer),
+ outparam('y_out', Numeric),
+ outparam('z_out', String)]),
+ x_in=5)
+ eq_(
+ result.out_parameters,
+ {'x_out':10, 'y_out':75, 'z_out':None}
+ )
assert isinstance(result.out_parameters['x_out'], int)
@classmethod
Column('parent_id', Integer, ForeignKey('ed.parent.id')),
schema = 'ed')
- self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id")
+ self.assert_compile(
+ parent.join(child),
+ "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id")
def test_subquery(self):
t = table('sometable', column('col1'), column('col2'))
s = select([s.c.col1, s.c.col2])
self.assert_compile(s, "SELECT col1, col2 FROM (SELECT "
- "sometable.col1 AS col1, sometable.col2 AS col2 FROM sometable)")
+ "sometable.col1 AS col1, sometable.col2 "
+ "AS col2 FROM sometable)")
def test_limit(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t]).limit(10).offset(20)
- self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
+ self.assert_compile(s,
+ "SELECT col1, col2 FROM (SELECT col1, col2, ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) WHERE ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1"
)
anon = a_table.alias()
self.assert_compile(
-
select([other_table, anon]).select_from(
other_table.outerjoin(anon)
).apply_labels(),
"SELECT other_thirty_characters_table_.id AS other_thirty_characters__1, "
- "other_thirty_characters_table_.thirty_characters_table_id AS other_thirty_characters__2, "
- "thirty_characters_table__1.id AS thirty_characters_table__3 FROM other_thirty_characters_table_ "
- "LEFT OUTER JOIN thirty_characters_table_xxxxxx AS thirty_characters_table__1 ON "
- "thirty_characters_table__1.id = other_thirty_characters_table_.thirty_characters_table_id",
+ "other_thirty_characters_table_.thirty_characters_table_id AS "
+ "other_thirty_characters__2, "
+ "thirty_characters_table__1.id AS thirty_characters_table__3 FROM "
+ "other_thirty_characters_table_ "
+ "LEFT OUTER JOIN thirty_characters_table_xxxxxx AS thirty_characters_table__1 "
+ "ON thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
dialect=dialect
)
self.assert_compile(
other_table.outerjoin(anon)
).apply_labels(),
"SELECT other_thirty_characters_table_.id AS other_thirty_characters__1, "
- "other_thirty_characters_table_.thirty_characters_table_id AS other_thirty_characters__2, "
- "thirty_characters_table__1.id AS thirty_characters_table__3 FROM other_thirty_characters_table_ "
+ "other_thirty_characters_table_.thirty_characters_table_id AS "
+ "other_thirty_characters__2, "
+ "thirty_characters_table__1.id AS thirty_characters_table__3 FROM "
+ "other_thirty_characters_table_ "
"LEFT OUTER JOIN thirty_characters_table_xxxxxx thirty_characters_table__1 ON "
- "thirty_characters_table__1.id = other_thirty_characters_table_.thirty_characters_table_id",
+ "thirty_characters_table__1.id = "
+ "other_thirty_characters_table_.thirty_characters_table_id",
dialect=ora_dialect
)
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable, myothertable WHERE "
"(mytable.name = :name_1 OR mytable.myid = :myid_1 OR "
- "myothertable.othername != :othername_1 OR EXISTS (select yay from foo where boo = lar)) "
+ "myothertable.othername != :othername_1 OR EXISTS (select yay "
+ "from foo where boo = lar)) "
"AND mytable.myid = myothertable.otherid(+)",
dialect=oracle.OracleDialect(use_ansi = False))
- query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid)
- self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff "
- "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER "
- "JOIN thirdtable ON thirdtable.userid = myothertable.otherid")
- self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM "
- "mytable, myothertable, thirdtable WHERE thirdtable.userid(+) = myothertable.otherid AND "
- "mytable.myid = myothertable.otherid(+)", dialect=oracle.dialect(use_ansi=False))
-
- query = table1.join(table2, table1.c.myid==table2.c.otherid).join(table3, table3.c.userid==table2.c.otherid)
- self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM "
- "mytable, myothertable, thirdtable WHERE thirdtable.userid = myothertable.otherid AND "
+ query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).\
+ outerjoin(table3, table3.c.userid==table2.c.otherid)
+ self.assert_compile(query.select(),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername, thirdtable.userid,"
+ " thirdtable.otherstuff "
+ "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid ="
+ " myothertable.otherid LEFT OUTER "
+ "JOIN thirdtable ON thirdtable.userid = myothertable.otherid")
+
+ self.assert_compile(query.select(),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername, thirdtable.userid,"
+ " thirdtable.otherstuff FROM "
+ "mytable, myothertable, thirdtable WHERE thirdtable.userid(+) ="
+ " myothertable.otherid AND "
+ "mytable.myid = myothertable.otherid(+)",
+ dialect=oracle.dialect(use_ansi=False))
+
+ query = table1.join(table2, table1.c.myid==table2.c.otherid).\
+ join(table3, table3.c.userid==table2.c.otherid)
+ self.assert_compile(query.select(),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername, thirdtable.userid, "
+ "thirdtable.otherstuff FROM "
+ "mytable, myothertable, thirdtable WHERE thirdtable.userid = "
+ "myothertable.otherid AND "
"mytable.myid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
- query = table1.join(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid)
+ query = table1.join(table2, table1.c.myid==table2.c.otherid).\
+ outerjoin(table3, table3.c.userid==table2.c.otherid)
self.assert_compile(query.select().order_by(table1.c.name).limit(10).offset(5),
"SELECT myid, name, description, otherid, othername, userid, "
"otherstuff FROM (SELECT myid, name, description, "
- "otherid, othername, userid, otherstuff, ROWNUM AS ora_rn FROM (SELECT "
- "mytable.myid AS myid, mytable.name AS name, mytable.description AS description, "
- "myothertable.otherid AS otherid, myothertable.othername AS othername, "
- "thirdtable.userid AS userid, thirdtable.otherstuff AS otherstuff FROM mytable, "
- "myothertable, thirdtable WHERE thirdtable.userid(+) = myothertable.otherid AND "
- "mytable.myid = myothertable.otherid ORDER BY mytable.name) WHERE "
- "ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1", dialect=oracle.dialect(use_ansi=False))
+ "otherid, othername, userid, otherstuff, "
+ "ROWNUM AS ora_rn FROM (SELECT "
+ "mytable.myid AS myid, mytable.name AS name, "
+ "mytable.description AS description, "
+ "myothertable.otherid AS otherid, myothertable.othername "
+ "AS othername, "
+ "thirdtable.userid AS userid, thirdtable.otherstuff AS "
+ "otherstuff FROM mytable, "
+ "myothertable, thirdtable WHERE thirdtable.userid(+) = "
+ "myothertable.otherid AND "
+ "mytable.myid = myothertable.otherid ORDER BY "
+ "mytable.name) WHERE "
+ "ROWNUM <= :ROWNUM_1) WHERE ora_rn > :ora_rn_1",
+ dialect=oracle.dialect(use_ansi=False))
subq = select([table1]).\
- select_from(table1.outerjoin(table2, table1.c.myid==table2.c.otherid)).alias()
- q = select([table3]).select_from(table3.outerjoin(subq, table3.c.userid==subq.c.myid))
+ select_from(
+ table1.outerjoin(table2, table1.c.myid==table2.c.otherid)
+ ).alias()
+ q = select([table3]).select_from(
+ table3.outerjoin(subq, table3.c.userid==subq.c.myid)
+ )
self.assert_compile(q, "SELECT thirdtable.userid, thirdtable.otherstuff "
- "FROM thirdtable LEFT OUTER JOIN (SELECT mytable.myid AS myid, mytable.name"
+ "FROM thirdtable LEFT OUTER JOIN (SELECT mytable.myid AS "
+ "myid, mytable.name"
" AS name, mytable.description AS description "
"FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = "
"myothertable.otherid) anon_1 ON thirdtable.userid = anon_1.myid",
- dialect=oracle.dialect(use_ansi=True))
+ dialect=oracle.dialect(use_ansi=True))
self.assert_compile(q, "SELECT thirdtable.userid, thirdtable.otherstuff "
- "FROM thirdtable, (SELECT mytable.myid AS myid, mytable.name AS name, "
- "mytable.description AS description FROM mytable, myothertable "
- "WHERE mytable.myid = myothertable.otherid(+)) anon_1 "
- "WHERE thirdtable.userid = anon_1.myid(+)",
- dialect=oracle.dialect(use_ansi=False))
+ "FROM thirdtable, (SELECT mytable.myid AS myid, mytable.name AS name, "
+ "mytable.description AS description FROM mytable, myothertable "
+ "WHERE mytable.myid = myothertable.otherid(+)) anon_1 "
+ "WHERE thirdtable.userid = anon_1.myid(+)",
+ dialect=oracle.dialect(use_ansi=False))
def test_alias_outer_join(self):
address_types = table('address_types',
at_alias = address_types.alias()
s = select([at_alias, addresses]).\
- select_from(addresses.outerjoin(at_alias, addresses.c.address_type_id==at_alias.c.id)).\
+ select_from(
+ addresses.outerjoin(at_alias,
+ addresses.c.address_type_id==at_alias.c.id)
+ ).\
where(addresses.c.user_id==7).\
order_by(addresses.c.id, address_types.c.id)
- self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, "
- "addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 "
- "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :user_id_1 ORDER BY addresses.id, "
- "address_types.id")
+ self.assert_compile(s,
+ "SELECT address_types_1.id, address_types_1.name, addresses.id, "
+ "addresses.user_id, "
+ "addresses.address_type_id, addresses.email_address FROM addresses "
+ "LEFT OUTER JOIN address_types address_types_1 "
+ "ON addresses.address_type_id = address_types_1.id WHERE "
+ "addresses.user_id = :user_id_1 ORDER BY addresses.id, "
+ "address_types.id")
def test_compound(self):
t1 = table('t1', column('c1'), column('c2'), column('c3'), )
create synonym test_schema.ptable for test_schema.parent;
create synonym test_schema.ctable for test_schema.child;
--- can't make a ref from local schema to the remote schema's table without this,
--- *and* cant give yourself a grant ! so we give it to public. ideas welcome.
+-- can't make a ref from local schema to the
+-- remote schema's table without this,
+-- *and* cant give yourself a grant !
+-- so we give it to public. ideas welcome.
grant references on test_schema.parent to public;
grant references on test_schema.child to public;
""".split(";"):
parent = Table('parent', meta, autoload=True, schema='test_schema')
child = Table('child', meta, autoload=True, schema='test_schema')
- self.assert_compile(parent.join(child), "test_schema.parent JOIN test_schema.child ON test_schema.parent.id = test_schema.child.parent_id")
- select([parent, child]).select_from(parent.join(child)).execute().fetchall()
+ self.assert_compile(parent.join(child),
+ "test_schema.parent JOIN test_schema.child ON "
+ "test_schema.parent.id = test_schema.child.parent_id")
+ select([parent, child]).\
+ select_from(parent.join(child)).\
+ execute().fetchall()
def test_reflect_local_to_remote(self):
testing.db.execute("CREATE TABLE localtable "
meta = MetaData(testing.db)
lcl = Table('localtable', meta, autoload=True)
parent = meta.tables['test_schema.parent']
- self.assert_compile(parent.join(lcl), "test_schema.parent JOIN localtable ON test_schema.parent.id = localtable.parent_id")
- select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ self.assert_compile(parent.join(lcl),
+ "test_schema.parent JOIN localtable ON "
+ "test_schema.parent.id = localtable.parent_id")
+ select([parent, lcl]).\
+ select_from(parent.join(lcl)).\
+ execute().fetchall()
finally:
testing.db.execute("DROP TABLE localtable")
parent = Table('parent', meta, autoload=True, schema='test_schema')
child = Table('child', meta, autoload=True, schema='test_schema')
- self.assert_compile(parent.join(child), "test_schema.parent JOIN test_schema.child ON test_schema.parent.id = test_schema.child.parent_id")
+ self.assert_compile(parent.join(child),
+ "test_schema.parent JOIN test_schema.child ON "
+ "test_schema.parent.id = test_schema.child.parent_id")
select([parent, child]).select_from(parent.join(child)).execute().fetchall()
def test_reflect_alt_owner_synonyms(self):
meta = MetaData(testing.db)
lcl = Table('localtable', meta, autoload=True, oracle_resolve_synonyms=True)
parent = meta.tables['test_schema.ptable']
- self.assert_compile(parent.join(lcl), "test_schema.ptable JOIN localtable ON test_schema.ptable.id = localtable.parent_id")
+ self.assert_compile(parent.join(lcl),
+ "test_schema.ptable JOIN localtable ON "
+ "test_schema.ptable.id = localtable.parent_id")
select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
finally:
testing.db.execute("DROP TABLE localtable")
def test_reflect_remote_synonyms(self):
meta = MetaData(testing.db)
- parent = Table('ptable', meta, autoload=True, schema='test_schema', oracle_resolve_synonyms=True)
- child = Table('ctable', meta, autoload=True, schema='test_schema', oracle_resolve_synonyms=True)
- self.assert_compile(parent.join(child), "test_schema.ptable JOIN test_schema.ctable ON test_schema.ptable.id = test_schema.ctable.parent_id")
+ parent = Table('ptable', meta, autoload=True,
+ schema='test_schema',
+ oracle_resolve_synonyms=True)
+ child = Table('ctable', meta, autoload=True,
+ schema='test_schema',
+ oracle_resolve_synonyms=True)
+ self.assert_compile(parent.join(child),
+ "test_schema.ptable JOIN test_schema.ctable ON "
+ "test_schema.ptable.id = test_schema.ctable.parent_id")
select([parent, child]).select_from(parent.join(child)).execute().fetchall()
class ConstraintTest(TestBase):
__only_on__ = 'oracle'
- def test_oracle_has_no_on_update_cascade(self):
- m = MetaData(testing.db)
+ def setup(self):
+ global metadata
+ metadata = MetaData(testing.db)
- foo = Table('foo', m,
+ foo = Table('foo', metadata,
Column('id', Integer, primary_key=True),
)
foo.create(checkfirst=True)
- try:
- bar = Table('bar', m,
- Column('id', Integer, primary_key=True),
- Column('foo_id', Integer, ForeignKey('foo.id', onupdate="CASCADE"))
- )
- assert_raises(exc.SAWarning, bar.create)
+
+ def teardown(self):
+ metadata.drop_all()
- bat = Table('bat', m,
- Column('id', Integer, primary_key=True),
- Column('foo_id', Integer),
- ForeignKeyConstraint(['foo_id'], ['foo.id'], onupdate="CASCADE")
- )
- assert_raises(exc.SAWarning, bat.create)
-
- finally:
- m.drop_all()
+ def test_oracle_has_no_on_update_cascade(self):
+ bar = Table('bar', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo_id', Integer, ForeignKey('foo.id', onupdate="CASCADE"))
+ )
+ assert_raises(exc.SAWarning, bar.create)
+
+ bat = Table('bat', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo_id', Integer),
+ ForeignKeyConstraint(['foo_id'], ['foo.id'], onupdate="CASCADE")
+ )
+ assert_raises(exc.SAWarning, bat.create)
class TypesTest(TestBase, AssertsCompiledSQL):
__dialect__ = oracle.OracleDialect()
def test_no_clobs_for_string_params(self):
- """test that simple string params get a DBAPI type of VARCHAR, not CLOB.
- this is to prevent setinputsizes from setting up cx_oracle.CLOBs on
+ """test that simple string params get a DBAPI type of
+ VARCHAR, not CLOB. This is to prevent setinputsizes
+ from setting up cx_oracle.CLOBs on
string-based bind params [ticket:793]."""
class FakeDBAPI(object):
(NCHAR(), cx_oracle._OracleNVarChar),
(oracle.RAW(50), cx_oracle._OracleRaw),
]:
- assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
+ assert isinstance(start.dialect_impl(dialect), test), \
+ "wanted %r got %r" % (test, start.dialect_impl(dialect))
@testing.requires.returning
def test_int_not_float(self):
finally:
t1.drop()
- @testing.fails_on('+zxjdbc', 'Not yet known how to pass values of the INTERVAL type')
+ @testing.fails_on('+zxjdbc',
+ 'Not yet known how to pass values of the INTERVAL type')
def test_interval(self):
for type_, expected in [
(oracle.INTERVAL(), "INTERVAL DAY TO SECOND"),
- (oracle.INTERVAL(day_precision=3), "INTERVAL DAY(3) TO SECOND"),
- (oracle.INTERVAL(second_precision=5), "INTERVAL DAY TO SECOND(5)"),
- (oracle.INTERVAL(day_precision=2, second_precision=5), "INTERVAL DAY(2) TO SECOND(5)"),
+ (
+ oracle.INTERVAL(day_precision=3),
+ "INTERVAL DAY(3) TO SECOND"
+ ),
+ (
+ oracle.INTERVAL(second_precision=5),
+ "INTERVAL DAY TO SECOND(5)"
+ ),
+ (
+ oracle.INTERVAL(day_precision=2, second_precision=5),
+ "INTERVAL DAY(2) TO SECOND(5)"
+ ),
]:
self.assert_compile(type_, expected)
metadata.drop_all()
def test_reflect_raw(self):
- types_table = Table(
- 'all_types', MetaData(testing.db),
+ types_table = Table('all_types', MetaData(testing.db),
Column('owner', String(30), primary_key=True),
Column('type_name', String(30), primary_key=True),
autoload=True, oracle_resolve_synonyms=True
)
- [[row[k] for k in row.keys()] for row in types_table.select().execute().fetchall()]
+ for row in types_table.select().execute().fetchall():
+ [row[k] for k in row.keys()]
def test_reflect_nvarchar(self):
metadata = MetaData(testing.db)
Column('data', Text), Column('bindata', LargeBinary))
t.create(engine)
try:
- engine.execute(t.insert(), id=1, data='this is text', bindata='this is binary')
+ engine.execute(t.insert(), id=1,
+ data='this is text',
+ bindata='this is binary')
row = engine.execute(t.select()).first()
eq_(row['data'].read(), 'this is text')
eq_(row['bindata'].read(), 'this is binary')
Column('data', LargeBinary)
)
meta.create_all()
- stream = os.path.join(os.path.dirname(__file__), "..", 'binary_data_one.dat')
+ stream = os.path.join(
+ os.path.dirname(__file__), "..",
+ 'binary_data_one.dat')
stream = file(stream).read(12000)
for i in range(1, 11):
def setup(self):
global metadata
metadata = MetaData(testing.db)
- t1 = Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True))
+ t1 = Table('test_index_reflect', metadata,
+ Column('data', String(20), primary_key=True)
+ )
metadata.create_all()
def teardown(self):