]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- these tests don't test anything in SQLAlchemy - from our perpsective,
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 19 Sep 2014 16:39:34 +0000 (12:39 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 19 Sep 2014 16:41:07 +0000 (12:41 -0400)
we need to be in transactions (tested elsewhere) and we need to
emit the correct FOR UPDATE strings (tested elsewhere).  There's nothing
in SQLA to be tested as far as validating that for update causes exceptions
or not, and these tests frequently fail as they are timing sensitive.

test/engine/test_transaction.py

index 8a530364221d75cb2809a7f5649856f0a08ff21c..d921e9ead4056ec016f8dcffb8f4b4536114d82c 100644 (file)
@@ -1126,139 +1126,6 @@ class TLTransactionTest(fixtures.TestBase):
             order_by(users.c.user_id)).fetchall(),
             [(1, ), (2, )])
 
-counters = None
-
-
-class ForUpdateTest(fixtures.TestBase):
-    __requires__ = 'ad_hoc_engines',
-    __backend__ = True
-
-    @classmethod
-    def setup_class(cls):
-        global counters, metadata
-        metadata = MetaData()
-        counters = Table('forupdate_counters', metadata,
-                         Column('counter_id', INT, primary_key=True),
-                         Column('counter_value', INT),
-                         test_needs_acid=True)
-        counters.create(testing.db)
-
-    def teardown(self):
-        testing.db.execute(counters.delete()).close()
-
-    @classmethod
-    def teardown_class(cls):
-        counters.drop(testing.db)
-
-    def increment(self, count, errors, update_style=True, delay=0.005):
-        con = testing.db.connect()
-        sel = counters.select(for_update=update_style,
-                              whereclause=counters.c.counter_id == 1)
-        for i in range(count):
-            trans = con.begin()
-            try:
-                existing = con.execute(sel).first()
-                incr = existing['counter_value'] + 1
-                time.sleep(delay)
-                con.execute(counters.update(counters.c.counter_id == 1,
-                            values={'counter_value': incr}))
-                time.sleep(delay)
-                readback = con.execute(sel).first()
-                if readback['counter_value'] != incr:
-                    raise AssertionError('Got %s post-update, expected '
-                            '%s' % (readback['counter_value'], incr))
-                trans.commit()
-            except Exception as e:
-                trans.rollback()
-                errors.append(e)
-                break
-        con.close()
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_queued_update(self):
-        """Test SELECT FOR UPDATE with concurrent modifications.
-
-        Runs concurrent modifications on a single row in the users
-        table, with each mutator trying to increment a value stored in
-        user_name.
-
-        """
-
-        db = testing.db
-        db.execute(counters.insert(), counter_id=1, counter_value=0)
-        iterations, thread_count = 10, 5
-        threads, errors = [], []
-        for i in range(thread_count):
-            thrd = threading.Thread(target=self.increment,
-                                    args=(iterations, ),
-                                    kwargs={'errors': errors,
-                                    'update_style': True})
-            thrd.start()
-            threads.append(thrd)
-        for thrd in threads:
-            thrd.join()
-        assert not errors
-        sel = counters.select(whereclause=counters.c.counter_id == 1)
-        final = db.execute(sel).first()
-        eq_(final['counter_value'], iterations * thread_count)
-
-    def overlap(self, ids, errors, update_style):
-
-        sel = counters.select(for_update=update_style,
-                              whereclause=counters.c.counter_id.in_(ids))
-        con = testing.db.connect()
-        trans = con.begin()
-        try:
-            rows = con.execute(sel).fetchall()
-            time.sleep(0.50)
-            trans.commit()
-        except Exception as e:
-            trans.rollback()
-            errors.append(e)
-        con.close()
-
-    def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
-        db = testing.db
-        for cid in range(pool - 1):
-            db.execute(counters.insert(), counter_id=cid + 1,
-                       counter_value=0)
-        errors, threads = [], []
-        for i in range(thread_count):
-            thrd = threading.Thread(target=self.overlap,
-                                    args=(groups.pop(0), errors,
-                                    update_style))
-            time.sleep(0.20)  # give the previous thread a chance to start
-                              # to ensure it gets a lock
-            thrd.start()
-            threads.append(thrd)
-        for thrd in threads:
-            thrd.join()
-        return errors
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_queued_select(self):
-        """Simple SELECT FOR UPDATE conflict test"""
-
-        errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)])
-        assert not errors
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.fails_on('mysql', 'No support for NOWAIT')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_nowait_select(self):
-        """Simple SELECT FOR UPDATE NOWAIT conflict test"""
-
-        errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)],
-                update_style='nowait')
-        assert errors
 
 class IsolationLevelTest(fixtures.TestBase):
     __requires__ = ('isolation_level', 'ad_hoc_engines')