class _OracleUnicodeStringCHAR(sqltypes.Unicode):
def get_dbapi_type(self, dbapi):
- return None
+ return dbapi.LONG_STRING
class _OracleUnicodeTextNCLOB(oracle.NCLOB):
if bindparam.isoutparam:
name = self.compiled.bind_names[bindparam]
type_impl = bindparam.type.dialect_impl(self.dialect)
+
if hasattr(type_impl, "_cx_oracle_var"):
self.out_parameters[name] = type_impl._cx_oracle_var(
self.dialect, self.cursor
)
else:
dbtype = type_impl.get_dbapi_type(self.dialect.dbapi)
+
+ cx_Oracle = self.dialect.dbapi
+
if dbtype is None:
raise exc.InvalidRequestError(
- "Cannot create out parameter for parameter "
+ "Cannot create out parameter for "
+ "parameter "
"%r - its type %r is not supported by"
" cx_oracle" % (bindparam.key, bindparam.type)
)
- self.out_parameters[name] = self.cursor.var(dbtype)
+
+ if compat.py2k and dbtype in (
+ cx_Oracle.CLOB,
+ cx_Oracle.NCLOB,
+ ):
+ outconverter = (
+ processors.to_unicode_processor_factory(
+ self.dialect.encoding,
+ errors=self.dialect.encoding_errors,
+ )
+ )
+ self.out_parameters[name] = self.cursor.var(
+ dbtype,
+ outconverter=lambda value: outconverter(
+ value.read()
+ ),
+ )
+
+ elif dbtype in (
+ cx_Oracle.BLOB,
+ cx_Oracle.CLOB,
+ cx_Oracle.NCLOB,
+ ):
+ self.out_parameters[name] = self.cursor.var(
+ dbtype, outconverter=lambda value: value.read()
+ )
+ elif compat.py2k and isinstance(
+ type_impl, sqltypes.Unicode
+ ):
+ outconverter = (
+ processors.to_unicode_processor_factory(
+ self.dialect.encoding,
+ errors=self.dialect.encoding_errors,
+ )
+ )
+ self.out_parameters[name] = self.cursor.var(
+ dbtype, outconverter=outconverter
+ )
+ else:
+ self.out_parameters[name] = self.cursor.var(dbtype)
self.parameters[0][
quoted_bind_names.get(name, name)
] = self.out_parameters[name]
import datetime
import decimal
import os
+import random
from sqlalchemy import bindparam
from sqlalchemy import cast
self.data,
)
+ @testing.combinations(
+ (UnicodeText(),), (Text(),), (LargeBinary(),), argnames="datatype"
+ )
+ @testing.combinations((10,), (100,), (250,), argnames="datasize")
+ @testing.combinations(
+ ("x,y,z"), ("y"), ("y,x,z"), ("x,z,y"), argnames="retcols"
+ )
+ def test_insert_returning_w_lobs(
+ self, datatype, datasize, retcols, metadata, connection
+ ):
+ long_text = Table(
+ "long_text",
+ metadata,
+ Column("x", Integer),
+ Column("y", datatype),
+ Column("z", Integer),
+ )
+ long_text.create(connection)
+
+ if isinstance(datatype, UnicodeText):
+ word_seed = u"ab🐍’«cdefg"
+ else:
+ word_seed = "abcdef"
+
+ some_text = u" ".join(
+ "".join(random.choice(word_seed) for j in range(150))
+ for i in range(datasize)
+ )
+ if isinstance(datatype, LargeBinary):
+ some_text = some_text.encode("ascii")
+
+ data = {"x": 5, "y": some_text, "z": 10}
+ return_columns = [long_text.c[col] for col in retcols.split(",")]
+ expected = tuple(data[col] for col in retcols.split(","))
+ result = connection.execute(
+ long_text.insert().returning(*return_columns),
+ data,
+ )
+
+ eq_(result.fetchall(), [expected])
+
+ def test_insert_returning_w_unicode(self, metadata, connection):
+ long_text = Table(
+ "long_text",
+ metadata,
+ Column("x", Integer),
+ Column("y", Unicode(255)),
+ )
+ long_text.create(connection)
+
+ word_seed = u"ab🐍’«cdefg"
+
+ some_text = u" ".join(
+ "".join(random.choice(word_seed) for j in range(10))
+ for i in range(15)
+ )
+
+ data = {"x": 5, "y": some_text}
+ result = connection.execute(
+ long_text.insert().returning(long_text.c.y),
+ data,
+ )
+
+ eq_(result.fetchall(), [(some_text,)])
+
def test_large_stream(self, connection):
binary_table = self.tables.binary_table
result = connection.execute(