def tearDownAll(self):
counters.drop(testbase.db)
- def increment(self, count, errors, delay=False,
- delay_duration=0.025, update_style=True):
+ def increment(self, count, errors, update_style=True, delay=0.005):
con = db.connect()
sel = counters.select(for_update=update_style,
whereclause=counters.c.counter_id==1)
existing = con.execute(sel).fetchone()
incr = existing['counter_value'] + 1
- if delay and random.randint(1,20) <= delay:
- time.sleep(delay_duration)
-
+ time.sleep(delay)
con.execute(counters.update(counters.c.counter_id==1,
values={'counter_value':incr}))
- if delay and random.randint(1,20) <= delay:
- time.sleep(delay_duration)
+ time.sleep(delay)
readback = con.execute(sel).fetchone()
if (readback['counter_value'] != incr):
con.close()
- def _threaded_increment(self, iterations, thread_count, delay,
- update_style):
+ @testbase.supported('mysql', 'oracle', 'postgres')
+ def testqueued_update(self):
+ """Test SELECT FOR UPDATE with concurrent modifications.
+
+ Runs concurrent modifications on a single row in the users table,
+ with each mutator trying to increment a value stored in user_name.
+ """
+
db = testbase.db
db.execute(counters.insert(), counter_id=1, counter_value=0)
+ iterations, thread_count = 10, 5
threads, errors = [], []
for i in xrange(thread_count):
thread = threading.Thread(target=self.increment,
args=(iterations,),
kwargs={'errors': errors,
- 'delay': delay,
- 'update_style': update_style})
+ 'update_style': True})
thread.start()
threads.append(thread)
for thread in threads:
final = db.execute(sel).fetchone()
self.assert_(final['counter_value'] == iterations * thread_count)
- @testbase.supported('mysql', 'oracle', 'postgres')
- def testqueued_fullspeed(self):
- """Test SELECT FOR UPDATE.
-
- Runs concurrent modifications on a single row in the users table,
- with each mutator trying to increment a value stored in user_name.
-
- Updates are made as fast a possible, with no added delays.
- """
- self._threaded_increment(50, 5, False, True)
-
- @testbase.supported('mysql', 'oracle', 'postgres')
- def testqueued_delayed(self):
- """Test SELECT FOR UPDATE with artificial delays.
-
- Runs concurrent modifications on a single row in the users table,
- with each mutator trying to increment a value stored in user_name.
-
- Individual updates may random sleep, causing all updates to queue
- for a while.
- """
- self._threaded_increment(50, 5, True, True)
-
- @testbase.supported('oracle', 'postgres')
- def testnowait(self):
- """Test SELECT FOR UPDATE NOWAIT.
-
- Run concurrent modifications on a single row with an artificial
- delay, expecting that writers will abort when encountering the
- locked row.
- """
+ def overlap(self, ids, errors, update_style):
+ sel = counters.select(for_update=update_style,
+ whereclause=counters.c.counter_id.in_(*ids))
+ con = db.connect()
+ trans = con.begin()
+ try:
+ rows = con.execute(sel).fetchall()
+ time.sleep(0.25)
+ trans.commit()
+ except Exception, e:
+ trans.rollback()
+ errors.append(e)
+ def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
db = testbase.db
- db.execute(counters.insert(), counter_id=1, counter_value=0)
-
- iterations, thread_count = 4, 2
- threads, errors = [], []
+ for cid in range(pool - 1):
+ db.execute(counters.insert(), counter_id=cid + 1, counter_value=0)
+
+ errors, threads = [], []
for i in xrange(thread_count):
- thread = threading.Thread(target=self.increment,
- args=(iterations,),
- kwargs={'errors': errors,
- 'delay': 20,
- 'delay_duration': 0.1,
- 'update_style': 'nowait'})
+ thread = threading.Thread(target=self.overlap,
+ args=(groups.pop(0), errors, update_style))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
+ return errors
+
+ @testbase.supported('mysql', 'oracle', 'postgres')
+ def testqueued_select(self):
+ """Simple SELECT FOR UPDATE conflict test"""
+
+ errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)])
+ for e in errors:
+ sys.stderr.write("Failure: %s\n" % e)
+ self.assert_(len(errors) == 0)
+
+ @testbase.supported('oracle', 'postgres')
+ def testnowait_select(self):
+ """Simple SELECT FOR UPDATE NOWAIT conflict test"""
+
+ errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],
+ update_style='nowait')
self.assert_(len(errors) != 0)
- sel = counters.select(whereclause=counters.c.counter_id==1)
- final = db.execute(sel).fetchone()
- self.assert_(final['counter_value'] != iterations * thread_count)
if __name__ == "__main__":
testbase.main()