super(OracleExecutionContext, self).pre_exec()
if self.dialect.auto_setinputsizes:
self.set_input_sizes()
- if self.compiled_parameters is not None and not isinstance(self.compiled_parameters, list):
- for key in self.compiled_parameters:
- (bindparam, name, value) = self.compiled_parameters.get_parameter(key)
+ if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
+ for key in self.compiled_parameters[0]:
+ (bindparam, name, value) = self.compiled_parameters[0].get_parameter(key)
if bindparam.isoutparam:
dbtype = bindparam.type.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
if not hasattr(self, 'out_parameters'):
self.out_parameters = {}
self.out_parameters[name] = self.cursor.var(dbtype)
- self.parameters[name] = self.out_parameters[name]
+ self.parameters[0][name] = self.out_parameters[name]
def get_result_proxy(self):
if hasattr(self, 'out_parameters'):
- if self.compiled_parameters is not None:
+ if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
for k in self.out_parameters:
- type = self.compiled_parameters.get_type(k)
+ type = self.compiled_parameters[0].get_type(k)
self.out_parameters[k] = type.dialect_impl(self.dialect).result_processor(self.dialect)(self.out_parameters[k].getvalue())
else:
for k in self.out_parameters:
if self.dbapi is None or not self.auto_convert_lobs:
return {}
else:
+ # only use this for LOB objects. using it for strings, dates
+ # etc. leads to a little too much magic, reflection doesn't know if it should
+ # expect encoded strings or unicodes, etc.
return {
- self.dbapi.NUMBER: OracleInteger(),
self.dbapi.CLOB: OracleText(),
self.dbapi.BLOB: OracleBinary(),
- self.dbapi.STRING: OracleString(),
- self.dbapi.TIMESTAMP: OracleTimestamp(),
self.dbapi.BINARY: OracleRaw(),
- datetime.datetime: OracleDate()
}
def dbapi(cls):
def _normalize_name(self, name):
if name is None:
return None
- elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower()):
- return name.lower()
+ elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower().decode(self.encoding)):
+ return name.lower().decode(self.encoding)
else:
- return name
+ return name.decode(self.encoding)
def _denormalize_name(self, name):
if name is None:
return None
elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()):
- return name.upper()
+ return name.upper().encode(self.encoding)
else:
- return name
+ return name.encode(self.encoding)
def table_names(self, connection, schema):
# note that table_names() isnt loading DBLINKed or synonym'ed tables
finally:
table.drop()
+ @testing.unsupported('oracle')
def testreserved(self):
# check a table that uses an SQL reserved name doesn't cause an error
meta = MetaData(testbase.db)
class UnicodeTest(PersistTest):
def test_basic(self):
try:
- bind = engines.utf8_engine()
+ # the 'convert_unicode' should not get in the way of the reflection
+ # process. reflecttable for oracle, postgres (others?) expect non-unicode
+ # strings in result sets/bind params
+ bind = engines.utf8_engine(options={'convert_unicode':True})
metadata = MetaData(bind)
names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'])
else:
self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
+ @testing.unsupported('oracle')
def testblanks(self):
unicode_table.insert().execute(unicode_varchar=u'')
assert select([unicode_table.c.unicode_varchar]).scalar() == u''
self.assert_(isinstance(x[0][0], datetime.datetime))
x = testbase.db.text(
- "select * from query_users_with_date where user_datetime=:date",
- bindparams=[bindparam('date', type_=types.DateTime)]).execute(
- date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
+ "select * from query_users_with_date where user_datetime=:somedate",
+ bindparams=[bindparam('somedate', type_=types.DateTime)]).execute(
+ somedate=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
print repr(x)
def testdate2(self):