order_by(users.c.user_id)).fetchall(),
[(1, ), (2, )])
-counters = None
-
-
-class ForUpdateTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global counters, metadata
- metadata = MetaData()
- counters = Table('forupdate_counters', metadata,
- Column('counter_id', INT, primary_key=True),
- Column('counter_value', INT),
- test_needs_acid=True)
- counters.create(testing.db)
-
- def teardown(self):
- testing.db.execute(counters.delete()).close()
-
- @classmethod
- def teardown_class(cls):
- counters.drop(testing.db)
-
- def increment(self, count, errors, update_style=True, delay=0.005):
- con = testing.db.connect()
- sel = counters.select(for_update=update_style,
- whereclause=counters.c.counter_id == 1)
- for i in range(count):
- trans = con.begin()
- try:
- existing = con.execute(sel).first()
- incr = existing['counter_value'] + 1
- time.sleep(delay)
- con.execute(counters.update(counters.c.counter_id == 1,
- values={'counter_value': incr}))
- time.sleep(delay)
- readback = con.execute(sel).first()
- if readback['counter_value'] != incr:
- raise AssertionError('Got %s post-update, expected '
- '%s' % (readback['counter_value'], incr))
- trans.commit()
- except Exception as e:
- trans.rollback()
- errors.append(e)
- break
- con.close()
-
- @testing.crashes('mssql', 'FIXME: unknown')
- @testing.crashes('firebird', 'FIXME: unknown')
- @testing.crashes('sybase', 'FIXME: unknown')
- @testing.requires.independent_connections
- def test_queued_update(self):
- """Test SELECT FOR UPDATE with concurrent modifications.
-
- Runs concurrent modifications on a single row in the users
- table, with each mutator trying to increment a value stored in
- user_name.
-
- """
-
- db = testing.db
- db.execute(counters.insert(), counter_id=1, counter_value=0)
- iterations, thread_count = 10, 5
- threads, errors = [], []
- for i in range(thread_count):
- thrd = threading.Thread(target=self.increment,
- args=(iterations, ),
- kwargs={'errors': errors,
- 'update_style': True})
- thrd.start()
- threads.append(thrd)
- for thrd in threads:
- thrd.join()
- assert not errors
- sel = counters.select(whereclause=counters.c.counter_id == 1)
- final = db.execute(sel).first()
- eq_(final['counter_value'], iterations * thread_count)
-
- def overlap(self, ids, errors, update_style):
-
- sel = counters.select(for_update=update_style,
- whereclause=counters.c.counter_id.in_(ids))
- con = testing.db.connect()
- trans = con.begin()
- try:
- rows = con.execute(sel).fetchall()
- time.sleep(0.50)
- trans.commit()
- except Exception as e:
- trans.rollback()
- errors.append(e)
- con.close()
-
- def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
- db = testing.db
- for cid in range(pool - 1):
- db.execute(counters.insert(), counter_id=cid + 1,
- counter_value=0)
- errors, threads = [], []
- for i in range(thread_count):
- thrd = threading.Thread(target=self.overlap,
- args=(groups.pop(0), errors,
- update_style))
- time.sleep(0.20) # give the previous thread a chance to start
- # to ensure it gets a lock
- thrd.start()
- threads.append(thrd)
- for thrd in threads:
- thrd.join()
- return errors
-
- @testing.crashes('mssql', 'FIXME: unknown')
- @testing.crashes('firebird', 'FIXME: unknown')
- @testing.crashes('sybase', 'FIXME: unknown')
- @testing.requires.independent_connections
- def test_queued_select(self):
- """Simple SELECT FOR UPDATE conflict test"""
-
- errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)])
- assert not errors
-
- @testing.crashes('mssql', 'FIXME: unknown')
- @testing.fails_on('mysql', 'No support for NOWAIT')
- @testing.crashes('firebird', 'FIXME: unknown')
- @testing.crashes('sybase', 'FIXME: unknown')
- @testing.requires.independent_connections
- def test_nowait_select(self):
- """Simple SELECT FOR UPDATE NOWAIT conflict test"""
-
- errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)],
- update_style='nowait')
- assert errors
class IsolationLevelTest(fixtures.TestBase):
__requires__ = ('isolation_level', 'ad_hoc_engines')