in other owner namespaces with the same name do not
conflict. [ticket:709]
+ - Cursors now have "arraysize" set to 50 by default on
+ them, the value of which is configurable using the
+ "arraysize" argument to create_engine() with the
+ Oracle dialect. This to account for cx_oracle's default
+ setting of "1", which has the effect of many round trips
+ being sent to Oracle. This actually works well in
+ conjunction with BLOB/CLOB-bound cursors, of which
+ there are any number available but only for the life of
+ that row request (so BufferedColumnRow is still needed,
+ but less so). [ticket:1062]
+
0.4.6
=====
self.out_parameters[name] = self.cursor.var(dbtype)
self.parameters[0][name] = self.out_parameters[name]
+ def create_cursor(self):
+ c = self._connection.connection.cursor()
+ if self.dialect.arraysize:
+ c.cursor.arraysize = self.dialect.arraysize
+ return c
+
def get_result_proxy(self):
if hasattr(self, 'out_parameters'):
if self.compiled_parameters is not None and len(self.compiled_parameters) == 1:
supports_pk_autoincrement = False
default_paramstyle = 'named'
- def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, **kwargs):
+ def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, arraysize=50, **kwargs):
default.DefaultDialect.__init__(self, **kwargs)
self.use_ansi = use_ansi
self.threaded = threaded
+ self.arraysize = arraysize
self.allow_twophase = allow_twophase
self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
self.auto_setinputsizes = auto_setinputsizes
from sqlalchemy.sql import table, column
from sqlalchemy.databases import oracle
from testlib import *
+from testlib.engines import testing_engine
+import os
class OutParamTest(TestBase, AssertsExecutionResults):
finally:
testing.db.execute("DROP TABLE Z_TEST")
+class BufferedColumnTest(TestBase, AssertsCompiledSQL):
+ __only_on__ = 'oracle'
+
+ def setUpAll(self):
+ global binary_table, stream, meta
+ meta = MetaData(testing.db)
+ binary_table = Table('binary_table', meta,
+ Column('id', Integer, primary_key=True),
+ Column('data', Binary)
+ )
+ meta.create_all()
+ stream = os.path.join(os.path.dirname(testenv.__file__), 'binary_data_one.dat')
+ stream = file(stream).read(12000)
+
+ for i in range(1, 11):
+ binary_table.insert().execute(id=i, data=stream)
+
+ def tearDownAll(self):
+ meta.drop_all()
+
+ def test_fetch(self):
+ self.assertEquals(
+ binary_table.select().execute().fetchall() ,
+ [(i, stream) for i in range(1, 11)],
+ )
+
+ def test_fetch_single_arraysize(self):
+ eng = testing_engine(options={'arraysize':1})
+ self.assertEquals(
+ eng.execute(binary_table.select()).fetchall(),
+ [(i, stream) for i in range(1, 11)],
+ )
+
class SequenceTest(TestBase, AssertsCompiledSQL):
def test_basic(self):
seq = Sequence("my_seq_no_schema")