From: Jason Kirtland Date: Tue, 12 Jun 2007 01:58:26 +0000 (+0000) Subject: - Faster FOR UPDATE tests X-Git-Tag: rel_0_4_6~206 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e1ae27d94f4ee193ee7b2448c646c955679d65bf;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Faster FOR UPDATE tests --- diff --git a/test/engine/transaction.py b/test/engine/transaction.py index 179a39b802..86093218ba 100644 --- a/test/engine/transaction.py +++ b/test/engine/transaction.py @@ -364,8 +364,7 @@ class ForUpdateTest(testbase.PersistTest): def tearDownAll(self): counters.drop(testbase.db) - def increment(self, count, errors, delay=False, - delay_duration=0.025, update_style=True): + def increment(self, count, errors, update_style=True, delay=0.005): con = db.connect() sel = counters.select(for_update=update_style, whereclause=counters.c.counter_id==1) @@ -376,13 +375,10 @@ class ForUpdateTest(testbase.PersistTest): existing = con.execute(sel).fetchone() incr = existing['counter_value'] + 1 - if delay and random.randint(1,20) <= delay: - time.sleep(delay_duration) - + time.sleep(delay) con.execute(counters.update(counters.c.counter_id==1, values={'counter_value':incr})) - if delay and random.randint(1,20) <= delay: - time.sleep(delay_duration) + time.sleep(delay) readback = con.execute(sel).fetchone() if (readback['counter_value'] != incr): @@ -396,18 +392,24 @@ class ForUpdateTest(testbase.PersistTest): con.close() - def _threaded_increment(self, iterations, thread_count, delay, - update_style): + @testbase.supported('mysql', 'oracle', 'postgres') + def testqueued_update(self): + """Test SELECT FOR UPDATE with concurrent modifications. + + Runs concurrent modifications on a single row in the users table, + with each mutator trying to increment a value stored in user_name. + """ + db = testbase.db db.execute(counters.insert(), counter_id=1, counter_value=0) + iterations, thread_count = 10, 5 threads, errors = [], [] for i in xrange(thread_count): thread = threading.Thread(target=self.increment, args=(iterations,), kwargs={'errors': errors, - 'delay': delay, - 'update_style': update_style}) + 'update_style': True}) thread.start() threads.append(thread) for thread in threads: @@ -422,59 +424,51 @@ class ForUpdateTest(testbase.PersistTest): final = db.execute(sel).fetchone() self.assert_(final['counter_value'] == iterations * thread_count) - @testbase.supported('mysql', 'oracle', 'postgres') - def testqueued_fullspeed(self): - """Test SELECT FOR UPDATE. - - Runs concurrent modifications on a single row in the users table, - with each mutator trying to increment a value stored in user_name. - - Updates are made as fast a possible, with no added delays. - """ - self._threaded_increment(50, 5, False, True) - - @testbase.supported('mysql', 'oracle', 'postgres') - def testqueued_delayed(self): - """Test SELECT FOR UPDATE with artificial delays. - - Runs concurrent modifications on a single row in the users table, - with each mutator trying to increment a value stored in user_name. - - Individual updates may random sleep, causing all updates to queue - for a while. - """ - self._threaded_increment(50, 5, True, True) - - @testbase.supported('oracle', 'postgres') - def testnowait(self): - """Test SELECT FOR UPDATE NOWAIT. - - Run concurrent modifications on a single row with an artificial - delay, expecting that writers will abort when encountering the - locked row. - """ + def overlap(self, ids, errors, update_style): + sel = counters.select(for_update=update_style, + whereclause=counters.c.counter_id.in_(*ids)) + con = db.connect() + trans = con.begin() + try: + rows = con.execute(sel).fetchall() + time.sleep(0.25) + trans.commit() + except Exception, e: + trans.rollback() + errors.append(e) + def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5): db = testbase.db - db.execute(counters.insert(), counter_id=1, counter_value=0) - - iterations, thread_count = 4, 2 - threads, errors = [], [] + for cid in range(pool - 1): + db.execute(counters.insert(), counter_id=cid + 1, counter_value=0) + + errors, threads = [], [] for i in xrange(thread_count): - thread = threading.Thread(target=self.increment, - args=(iterations,), - kwargs={'errors': errors, - 'delay': 20, - 'delay_duration': 0.1, - 'update_style': 'nowait'}) + thread = threading.Thread(target=self.overlap, + args=(groups.pop(0), errors, update_style)) thread.start() threads.append(thread) for thread in threads: thread.join() + return errors + + @testbase.supported('mysql', 'oracle', 'postgres') + def testqueued_select(self): + """Simple SELECT FOR UPDATE conflict test""" + + errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)]) + for e in errors: + sys.stderr.write("Failure: %s\n" % e) + self.assert_(len(errors) == 0) + + @testbase.supported('oracle', 'postgres') + def testnowait_select(self): + """Simple SELECT FOR UPDATE NOWAIT conflict test""" + + errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], + update_style='nowait') self.assert_(len(errors) != 0) - sel = counters.select(whereclause=counters.c.counter_id==1) - final = db.execute(sel).fetchone() - self.assert_(final['counter_value'] != iterations * thread_count) if __name__ == "__main__": testbase.main()