)
-class ResultProxyTest(fixtures.TestBase):
- __backend__ = True
-
- def test_nontuple_row(self):
- """ensure the C version of BaseRowProxy handles
- duck-type-dependent rows."""
-
- from sqlalchemy.engine import RowProxy
-
- class MyList(object):
-
- def __init__(self, l):
- self.l = l
-
- def __len__(self):
- return len(self.l)
-
- def __getitem__(self, i):
- return list.__getitem__(self.l, i)
-
- proxy = RowProxy(object(), MyList(['value']), [None], {
- 'key': (None, None, 0), 0: (None, None, 0)})
- eq_(list(proxy), ['value'])
- eq_(proxy[0], 'value')
- eq_(proxy['key'], 'value')
-
- @testing.provide_metadata
- def test_no_rowcount_on_selects_inserts(self):
- """assert that rowcount is only called on deletes and updates.
-
- This because cursor.rowcount may can be expensive on some dialects
- such as Firebird, however many dialects require it be called
- before the cursor is closed.
-
- """
-
- metadata = self.metadata
-
- engine = engines.testing_engine()
-
- t = Table('t1', metadata,
- Column('data', String(10))
- )
- metadata.create_all(engine)
-
- with patch.object(
- engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
- mock_rowcount.__get__ = Mock()
- engine.execute(t.insert(),
- {'data': 'd1'},
- {'data': 'd2'},
- {'data': 'd3'})
-
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
-
- eq_(
- engine.execute(t.select()).fetchall(),
- [('d1', ), ('d2', ), ('d3', )]
- )
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
-
- engine.execute(t.update(), {'data': 'd4'})
-
- eq_(len(mock_rowcount.__get__.mock_calls), 1)
-
- engine.execute(t.delete())
- eq_(len(mock_rowcount.__get__.mock_calls), 2)
-
- def test_rowproxy_is_sequence(self):
- import collections
- from sqlalchemy.engine import RowProxy
-
- row = RowProxy(
- object(), ['value'], [None],
- {'key': (None, None, 0), 0: (None, None, 0)})
- assert isinstance(row, collections.Sequence)
-
- @testing.provide_metadata
- def test_rowproxy_getitem_indexes_compiled(self):
- values = Table('users', self.metadata,
- Column('key', String(10), primary_key=True),
- Column('value', String(10)))
- values.create()
-
- testing.db.execute(values.insert(), dict(key='One', value='Uno'))
- row = testing.db.execute(values.select()).first()
- eq_(row['key'], 'One')
- eq_(row['value'], 'Uno')
- eq_(row[0], 'One')
- eq_(row[1], 'Uno')
- eq_(row[-2], 'One')
- eq_(row[-1], 'Uno')
- eq_(row[1:0:-1], ('Uno',))
-
- def test_rowproxy_getitem_indexes_raw(self):
- row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
- eq_(row['key'], 'One')
- eq_(row['value'], 'Uno')
- eq_(row[0], 'One')
- eq_(row[1], 'Uno')
- eq_(row[-2], 'One')
- eq_(row[-1], 'Uno')
- eq_(row[1:0:-1], ('Uno',))
-
- @testing.requires.cextensions
- def test_row_c_sequence_check(self):
- import csv
-
- metadata = MetaData()
- metadata.bind = 'sqlite://'
- users = Table('users', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(40)),
- )
- users.create()
-
- users.insert().execute(name='Test')
- row = users.select().execute().fetchone()
-
- s = util.StringIO()
- writer = csv.writer(s)
- # csv performs PySequenceCheck call
- writer.writerow(row)
- assert s.getvalue().strip() == '1,Test'
-
- @testing.requires.selectone
- def test_empty_accessors(self):
- statements = [
- (
- "select 1",
- [
- lambda r: r.last_inserted_params(),
- lambda r: r.last_updated_params(),
- lambda r: r.prefetch_cols(),
- lambda r: r.postfetch_cols(),
- lambda r: r.inserted_primary_key
- ],
- "Statement is not a compiled expression construct."
- ),
- (
- select([1]),
- [
- lambda r: r.last_inserted_params(),
- lambda r: r.inserted_primary_key
- ],
- r"Statement is not an insert\(\) expression construct."
- ),
- (
- select([1]),
- [
- lambda r: r.last_updated_params(),
- ],
- r"Statement is not an update\(\) expression construct."
- ),
- (
- select([1]),
- [
- lambda r: r.prefetch_cols(),
- lambda r: r.postfetch_cols()
- ],
- r"Statement is not an insert\(\) "
- r"or update\(\) expression construct."
- ),
- ]
-
- for stmt, meths, msg in statements:
- r = testing.db.execute(stmt)
- try:
- for meth in meths:
- assert_raises_message(
- tsa.exc.InvalidRequestError,
- msg,
- meth, r
- )
-
- finally:
- r.close()
-
class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
__requires__ = 'schemas',
__backend__ = True
)
-class AlternateResultProxyTest(fixtures.TablesTest):
- __requires__ = ('sqlite', )
-
- @classmethod
- def setup_bind(cls):
- cls.engine = engine = testing_engine('sqlite://')
- return engine
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- 'test', metadata,
- Column('x', Integer, primary_key=True),
- Column('y', String(50, convert_unicode='force'))
- )
-
- @classmethod
- def insert_data(cls):
- cls.engine.execute(cls.tables.test.insert(), [
- {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
- ])
-
- @contextmanager
- def _proxy_fixture(self, cls):
- self.table = self.tables.test
-
- class ExcCtx(default.DefaultExecutionContext):
-
- def get_result_proxy(self):
- return cls(self)
- self.patcher = patch.object(
- self.engine.dialect, "execution_ctx_cls", ExcCtx)
- with self.patcher:
- yield
-
- def _test_proxy(self, cls):
- with self._proxy_fixture(cls):
- rows = []
- r = self.engine.execute(select([self.table]))
- assert isinstance(r, cls)
- for i in range(5):
- rows.append(r.fetchone())
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
-
- rows = r.fetchmany(3)
- eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
-
- rows = r.fetchall()
- eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
-
- r = self.engine.execute(select([self.table]))
- rows = r.fetchmany(None)
- eq_(rows[0], (1, "t_1"))
- # number of rows here could be one, or the whole thing
- assert len(rows) == 1 or len(rows) == 11
-
- r = self.engine.execute(select([self.table]).limit(1))
- r.fetchone()
- eq_(r.fetchone(), None)
-
- r = self.engine.execute(select([self.table]).limit(5))
- rows = r.fetchmany(6)
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
-
- # result keeps going just fine with blank results...
- eq_(r.fetchmany(2), [])
-
- eq_(r.fetchmany(2), [])
-
- eq_(r.fetchall(), [])
-
- eq_(r.fetchone(), None)
-
- # until we close
- r.close()
-
- self._assert_result_closed(r)
-
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.first(), (1, "t_1"))
- self._assert_result_closed(r)
-
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.scalar(), 1)
- self._assert_result_closed(r)
-
- def _assert_result_closed(self, r):
- assert_raises_message(
- tsa.exc.ResourceClosedError,
- "object is closed",
- r.fetchone
- )
-
- assert_raises_message(
- tsa.exc.ResourceClosedError,
- "object is closed",
- r.fetchmany, 2
- )
-
- assert_raises_message(
- tsa.exc.ResourceClosedError,
- "object is closed",
- r.fetchall
- )
-
- def test_plain(self):
- self._test_proxy(_result.ResultProxy)
-
- def test_buffered_row_result_proxy(self):
- self._test_proxy(_result.BufferedRowResultProxy)
-
- def test_fully_buffered_result_proxy(self):
- self._test_proxy(_result.FullyBufferedResultProxy)
-
- def test_buffered_column_result_proxy(self):
- self._test_proxy(_result.BufferedColumnResultProxy)
-
- def test_buffered_row_growth(self):
- with self._proxy_fixture(_result.BufferedRowResultProxy):
- with self.engine.connect() as conn:
- conn.execute(self.table.insert(), [
- {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
- ])
- result = conn.execute(self.table.select())
- checks = {
- 0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
- 1351: 1000
- }
- for idx, row in enumerate(result, 0):
- if idx in checks:
- eq_(result._bufsize, checks[idx])
- le_(
- len(result._BufferedRowResultProxy__rowbuffer),
- 1000
- )
-
- def test_max_row_buffer_option(self):
- with self._proxy_fixture(_result.BufferedRowResultProxy):
- with self.engine.connect() as conn:
- conn.execute(self.table.insert(), [
- {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
- ])
- result = conn.execution_options(max_row_buffer=27).execute(
- self.table.select()
- )
- for idx, row in enumerate(result, 0):
- if idx in (16, 70, 150, 250):
- eq_(result._bufsize, 27)
- le_(
- len(result._BufferedRowResultProxy__rowbuffer),
- 27
- )
-
-
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
__backend__ = True
from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
- in_, not_in_, is_, ne_
+ in_, not_in_, is_, ne_, le_
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, engines
from sqlalchemy import util
from sqlalchemy.testing.schema import Table, Column
import operator
from sqlalchemy.testing import assertions
+from sqlalchemy import exc as sa_exc
+from sqlalchemy.testing.mock import patch, Mock
+from contextlib import contextmanager
+from sqlalchemy.engine import default
class ResultProxyTest(fixtures.TablesTest):
eq_(r['_parent'], 'Hidden parent')
eq_(r['_row'], 'Hidden row')
+ def test_nontuple_row(self):
+ """ensure the C version of BaseRowProxy handles
+ duck-type-dependent rows."""
+
+ from sqlalchemy.engine import RowProxy
+
+ class MyList(object):
+
+ def __init__(self, l):
+ self.l = l
+
+ def __len__(self):
+ return len(self.l)
+
+ def __getitem__(self, i):
+ return list.__getitem__(self.l, i)
+
+ proxy = RowProxy(object(), MyList(['value']), [None], {
+ 'key': (None, None, 0), 0: (None, None, 0)})
+ eq_(list(proxy), ['value'])
+ eq_(proxy[0], 'value')
+ eq_(proxy['key'], 'value')
+
+ @testing.provide_metadata
+ def test_no_rowcount_on_selects_inserts(self):
+ """assert that rowcount is only called on deletes and updates.
+
+ This because cursor.rowcount may can be expensive on some dialects
+ such as Firebird, however many dialects require it be called
+ before the cursor is closed.
+
+ """
+
+ metadata = self.metadata
+
+ engine = engines.testing_engine()
+
+ t = Table('t1', metadata,
+ Column('data', String(10))
+ )
+ metadata.create_all(engine)
+
+ with patch.object(
+ engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
+ mock_rowcount.__get__ = Mock()
+ engine.execute(t.insert(),
+ {'data': 'd1'},
+ {'data': 'd2'},
+ {'data': 'd3'})
+
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+ eq_(
+ engine.execute(t.select()).fetchall(),
+ [('d1', ), ('d2', ), ('d3', )]
+ )
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+ engine.execute(t.update(), {'data': 'd4'})
+
+ eq_(len(mock_rowcount.__get__.mock_calls), 1)
+
+ engine.execute(t.delete())
+ eq_(len(mock_rowcount.__get__.mock_calls), 2)
+
+ def test_rowproxy_is_sequence(self):
+ import collections
+ from sqlalchemy.engine import RowProxy
+
+ row = RowProxy(
+ object(), ['value'], [None],
+ {'key': (None, None, 0), 0: (None, None, 0)})
+ assert isinstance(row, collections.Sequence)
+
+ @testing.provide_metadata
+ def test_rowproxy_getitem_indexes_compiled(self):
+ values = Table('rp', self.metadata,
+ Column('key', String(10), primary_key=True),
+ Column('value', String(10)))
+ values.create()
+
+ testing.db.execute(values.insert(), dict(key='One', value='Uno'))
+ row = testing.db.execute(values.select()).first()
+ eq_(row['key'], 'One')
+ eq_(row['value'], 'Uno')
+ eq_(row[0], 'One')
+ eq_(row[1], 'Uno')
+ eq_(row[-2], 'One')
+ eq_(row[-1], 'Uno')
+ eq_(row[1:0:-1], ('Uno',))
+
+ def test_rowproxy_getitem_indexes_raw(self):
+ row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
+ eq_(row['key'], 'One')
+ eq_(row['value'], 'Uno')
+ eq_(row[0], 'One')
+ eq_(row[1], 'Uno')
+ eq_(row[-2], 'One')
+ eq_(row[-1], 'Uno')
+ eq_(row[1:0:-1], ('Uno',))
+
+ @testing.requires.cextensions
+ def test_row_c_sequence_check(self):
+ import csv
+
+ metadata = MetaData()
+ metadata.bind = 'sqlite://'
+ users = Table('users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(40)),
+ )
+ users.create()
+
+ users.insert().execute(name='Test')
+ row = users.select().execute().fetchone()
+
+ s = util.StringIO()
+ writer = csv.writer(s)
+ # csv performs PySequenceCheck call
+ writer.writerow(row)
+ assert s.getvalue().strip() == '1,Test'
+
+ @testing.requires.selectone
+ def test_empty_accessors(self):
+ statements = [
+ (
+ "select 1",
+ [
+ lambda r: r.last_inserted_params(),
+ lambda r: r.last_updated_params(),
+ lambda r: r.prefetch_cols(),
+ lambda r: r.postfetch_cols(),
+ lambda r: r.inserted_primary_key
+ ],
+ "Statement is not a compiled expression construct."
+ ),
+ (
+ select([1]),
+ [
+ lambda r: r.last_inserted_params(),
+ lambda r: r.inserted_primary_key
+ ],
+ r"Statement is not an insert\(\) expression construct."
+ ),
+ (
+ select([1]),
+ [
+ lambda r: r.last_updated_params(),
+ ],
+ r"Statement is not an update\(\) expression construct."
+ ),
+ (
+ select([1]),
+ [
+ lambda r: r.prefetch_cols(),
+ lambda r: r.postfetch_cols()
+ ],
+ r"Statement is not an insert\(\) "
+ r"or update\(\) expression construct."
+ ),
+ ]
+
+ for stmt, meths, msg in statements:
+ r = testing.db.execute(stmt)
+ try:
+ for meth in meths:
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ msg,
+ meth, r
+ )
+
+ finally:
+ r.close()
+
+
class KeyTargetingTest(fixtures.TablesTest):
run_inserts = 'once'
"Could not locate column in row for column 'text1.a'",
lambda: row[text1.c.a]
)
+
+
+class AlternateResultProxyTest(fixtures.TablesTest):
+ __requires__ = ('sqlite', )
+
+ @classmethod
+ def setup_bind(cls):
+ cls.engine = engine = engines.testing_engine('sqlite://')
+ return engine
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'test', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', String(50, convert_unicode='force'))
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.engine.execute(cls.tables.test.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
+ ])
+
+ @contextmanager
+ def _proxy_fixture(self, cls):
+ self.table = self.tables.test
+
+ class ExcCtx(default.DefaultExecutionContext):
+
+ def get_result_proxy(self):
+ return cls(self)
+ self.patcher = patch.object(
+ self.engine.dialect, "execution_ctx_cls", ExcCtx)
+ with self.patcher:
+ yield
+
+ def _test_proxy(self, cls):
+ with self._proxy_fixture(cls):
+ rows = []
+ r = self.engine.execute(select([self.table]))
+ assert isinstance(r, cls)
+ for i in range(5):
+ rows.append(r.fetchone())
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+
+ rows = r.fetchmany(3)
+ eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+
+ rows = r.fetchall()
+ eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+
+ r = self.engine.execute(select([self.table]))
+ rows = r.fetchmany(None)
+ eq_(rows[0], (1, "t_1"))
+ # number of rows here could be one, or the whole thing
+ assert len(rows) == 1 or len(rows) == 11
+
+ r = self.engine.execute(select([self.table]).limit(1))
+ r.fetchone()
+ eq_(r.fetchone(), None)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ rows = r.fetchmany(6)
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
+
+ eq_(r.fetchmany(2), [])
+
+ eq_(r.fetchall(), [])
+
+ eq_(r.fetchone(), None)
+
+ # until we close
+ r.close()
+
+ self._assert_result_closed(r)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
+
+ def _assert_result_closed(self, r):
+ assert_raises_message(
+ sa_exc.ResourceClosedError,
+ "object is closed",
+ r.fetchone
+ )
+
+ assert_raises_message(
+ sa_exc.ResourceClosedError,
+ "object is closed",
+ r.fetchmany, 2
+ )
+
+ assert_raises_message(
+ sa_exc.ResourceClosedError,
+ "object is closed",
+ r.fetchall
+ )
+
+ def test_plain(self):
+ self._test_proxy(_result.ResultProxy)
+
+ def test_buffered_row_result_proxy(self):
+ self._test_proxy(_result.BufferedRowResultProxy)
+
+ def test_fully_buffered_result_proxy(self):
+ self._test_proxy(_result.FullyBufferedResultProxy)
+
+ def test_buffered_column_result_proxy(self):
+ self._test_proxy(_result.BufferedColumnResultProxy)
+
+ def test_buffered_row_growth(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execute(self.table.select())
+ checks = {
+ 0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
+ 1351: 1000
+ }
+ for idx, row in enumerate(result, 0):
+ if idx in checks:
+ eq_(result._bufsize, checks[idx])
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 1000
+ )
+
+ def test_max_row_buffer_option(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execution_options(max_row_buffer=27).execute(
+ self.table.select()
+ )
+ for idx, row in enumerate(result, 0):
+ if idx in (16, 70, 150, 250):
+ eq_(result._bufsize, 27)
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 27
+ )
+