]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Faster FOR UPDATE tests
authorJason Kirtland <jek@discorporate.us>
Tue, 12 Jun 2007 01:58:26 +0000 (01:58 +0000)
committerJason Kirtland <jek@discorporate.us>
Tue, 12 Jun 2007 01:58:26 +0000 (01:58 +0000)
test/engine/transaction.py

index 179a39b802d044cefc662cd8c853989b19e329e2..86093218ba495ba3af7b746cb6f4129f30c24ffa 100644 (file)
@@ -364,8 +364,7 @@ class ForUpdateTest(testbase.PersistTest):
     def tearDownAll(self):
         counters.drop(testbase.db)
 
-    def increment(self, count, errors, delay=False,
-                  delay_duration=0.025, update_style=True):
+    def increment(self, count, errors, update_style=True, delay=0.005):
         con = db.connect()
         sel = counters.select(for_update=update_style,
                               whereclause=counters.c.counter_id==1)
@@ -376,13 +375,10 @@ class ForUpdateTest(testbase.PersistTest):
                 existing = con.execute(sel).fetchone()
                 incr = existing['counter_value'] + 1
 
-                if delay and random.randint(1,20) <= delay:
-                    time.sleep(delay_duration)
-
+                time.sleep(delay)
                 con.execute(counters.update(counters.c.counter_id==1,
                                             values={'counter_value':incr}))
-                if delay and random.randint(1,20) <= delay:
-                    time.sleep(delay_duration)
+                time.sleep(delay)
 
                 readback = con.execute(sel).fetchone()
                 if (readback['counter_value'] != incr):
@@ -396,18 +392,24 @@ class ForUpdateTest(testbase.PersistTest):
 
         con.close()
 
-    def _threaded_increment(self, iterations, thread_count, delay,
-                            update_style):
+    @testbase.supported('mysql', 'oracle', 'postgres')
+    def testqueued_update(self):
+        """Test SELECT FOR UPDATE with concurrent modifications.
+
+        Runs concurrent modifications on a single row in the users table,
+        with each mutator trying to increment a value stored in user_name.
+        """
+
         db = testbase.db
         db.execute(counters.insert(), counter_id=1, counter_value=0)
 
+        iterations, thread_count = 10, 5
         threads, errors = [], []
         for i in xrange(thread_count):
             thread = threading.Thread(target=self.increment,
                                       args=(iterations,),
                                       kwargs={'errors': errors,
-                                              'delay': delay,
-                                              'update_style': update_style})
+                                              'update_style': True})
             thread.start()
             threads.append(thread)
         for thread in threads:
@@ -422,59 +424,51 @@ class ForUpdateTest(testbase.PersistTest):
         final = db.execute(sel).fetchone()
         self.assert_(final['counter_value'] == iterations * thread_count)
 
-    @testbase.supported('mysql', 'oracle', 'postgres')
-    def testqueued_fullspeed(self):
-        """Test SELECT FOR UPDATE.
-
-        Runs concurrent modifications on a single row in the users table,
-        with each mutator trying to increment a value stored in user_name.
-        
-        Updates are made as fast a possible, with no added delays.
-        """
-        self._threaded_increment(50, 5, False, True)
-
-    @testbase.supported('mysql', 'oracle', 'postgres')
-    def testqueued_delayed(self):
-        """Test SELECT FOR UPDATE with artificial delays.
-
-        Runs concurrent modifications on a single row in the users table,
-        with each mutator trying to increment a value stored in user_name.
-
-        Individual updates may random sleep, causing all updates to queue
-        for a while.
-        """
-        self._threaded_increment(50, 5, True, True)
-
-    @testbase.supported('oracle', 'postgres')
-    def testnowait(self):
-        """Test SELECT FOR UPDATE NOWAIT.
-
-        Run concurrent modifications on a single row with an artificial
-        delay, expecting that writers will abort when encountering the
-        locked row.
-        """
+    def overlap(self, ids, errors, update_style):
+        sel = counters.select(for_update=update_style,
+                              whereclause=counters.c.counter_id.in_(*ids))
+        con = db.connect()
+        trans = con.begin()
+        try:
+            rows = con.execute(sel).fetchall()
+            time.sleep(0.25)
+            trans.commit()
+        except Exception, e:
+            trans.rollback()
+            errors.append(e)
 
+    def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
         db = testbase.db
-        db.execute(counters.insert(), counter_id=1, counter_value=0)
-        
-        iterations, thread_count = 4, 2
-        threads, errors = [], []
+        for cid in range(pool - 1):
+            db.execute(counters.insert(), counter_id=cid + 1, counter_value=0)
+
+        errors, threads = [], []
         for i in xrange(thread_count):
-            thread = threading.Thread(target=self.increment,
-                                      args=(iterations,),
-                                      kwargs={'errors': errors,
-                                              'delay': 20,
-                                              'delay_duration': 0.1,
-                                              'update_style': 'nowait'})
+            thread = threading.Thread(target=self.overlap,
+                                      args=(groups.pop(0), errors, update_style))
             thread.start()
             threads.append(thread)
         for thread in threads:
             thread.join()
 
+        return errors
+        
+    @testbase.supported('mysql', 'oracle', 'postgres')
+    def testqueued_select(self):
+        """Simple SELECT FOR UPDATE conflict test"""
+
+        errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)])
+        for e in errors:
+            sys.stderr.write("Failure: %s\n" % e)
+        self.assert_(len(errors) == 0)
+
+    @testbase.supported('oracle', 'postgres')
+    def testnowait_select(self):
+        """Simple SELECT FOR UPDATE NOWAIT conflict test"""
+
+        errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],
+                                        update_style='nowait')
         self.assert_(len(errors) != 0)
-        sel = counters.select(whereclause=counters.c.counter_id==1)
-        final = db.execute(sel).fetchone()
-        self.assert_(final['counter_value'] != iterations * thread_count)
         
 if __name__ == "__main__":
     testbase.main()