]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- these tests don't test anything in SQLAlchemy - from our perpsective,
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 19 Sep 2014 16:39:34 +0000 (12:39 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 19 Sep 2014 16:42:03 +0000 (12:42 -0400)
we need to be in transactions (tested elsewhere) and we need to
emit the correct FOR UPDATE strings (tested elsewhere).  There's nothing
in SQLA to be tested as far as validating that for update causes exceptions
or not, and these tests frequently fail as they are timing sensitive.

test/engine/test_transaction.py

index f9744444d296943f5e27305c1b4bc73cbd9bef43..c6802cb3ed5f758aad177c494bbd5e9748a35751 100644 (file)
@@ -1125,139 +1125,6 @@ class TLTransactionTest(fixtures.TestBase):
             order_by(users.c.user_id)).fetchall(),
             [(1, ), (2, )])
 
-counters = None
-
-
-class ForUpdateTest(fixtures.TestBase):
-    __requires__ = 'ad_hoc_engines',
-    __backend__ = True
-
-    @classmethod
-    def setup_class(cls):
-        global counters, metadata
-        metadata = MetaData()
-        counters = Table('forupdate_counters', metadata,
-                         Column('counter_id', INT, primary_key=True),
-                         Column('counter_value', INT),
-                         test_needs_acid=True)
-        counters.create(testing.db)
-
-    def teardown(self):
-        testing.db.execute(counters.delete()).close()
-
-    @classmethod
-    def teardown_class(cls):
-        counters.drop(testing.db)
-
-    def increment(self, count, errors, update_style=True, delay=0.005):
-        con = testing.db.connect()
-        sel = counters.select(for_update=update_style,
-                              whereclause=counters.c.counter_id == 1)
-        for i in range(count):
-            trans = con.begin()
-            try:
-                existing = con.execute(sel).first()
-                incr = existing['counter_value'] + 1
-                time.sleep(delay)
-                con.execute(counters.update(counters.c.counter_id == 1,
-                            values={'counter_value': incr}))
-                time.sleep(delay)
-                readback = con.execute(sel).first()
-                if readback['counter_value'] != incr:
-                    raise AssertionError('Got %s post-update, expected '
-                            '%s' % (readback['counter_value'], incr))
-                trans.commit()
-            except Exception as e:
-                trans.rollback()
-                errors.append(e)
-                break
-        con.close()
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_queued_update(self):
-        """Test SELECT FOR UPDATE with concurrent modifications.
-
-        Runs concurrent modifications on a single row in the users
-        table, with each mutator trying to increment a value stored in
-        user_name.
-
-        """
-
-        db = testing.db
-        db.execute(counters.insert(), counter_id=1, counter_value=0)
-        iterations, thread_count = 10, 5
-        threads, errors = [], []
-        for i in range(thread_count):
-            thrd = threading.Thread(target=self.increment,
-                                    args=(iterations, ),
-                                    kwargs={'errors': errors,
-                                    'update_style': True})
-            thrd.start()
-            threads.append(thrd)
-        for thrd in threads:
-            thrd.join()
-        assert not errors
-        sel = counters.select(whereclause=counters.c.counter_id == 1)
-        final = db.execute(sel).first()
-        eq_(final['counter_value'], iterations * thread_count)
-
-    def overlap(self, ids, errors, update_style):
-
-        sel = counters.select(for_update=update_style,
-                              whereclause=counters.c.counter_id.in_(ids))
-        con = testing.db.connect()
-        trans = con.begin()
-        try:
-            rows = con.execute(sel).fetchall()
-            time.sleep(0.50)
-            trans.commit()
-        except Exception as e:
-            trans.rollback()
-            errors.append(e)
-        con.close()
-
-    def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
-        db = testing.db
-        for cid in range(pool - 1):
-            db.execute(counters.insert(), counter_id=cid + 1,
-                       counter_value=0)
-        errors, threads = [], []
-        for i in range(thread_count):
-            thrd = threading.Thread(target=self.overlap,
-                                    args=(groups.pop(0), errors,
-                                    update_style))
-            time.sleep(0.20)  # give the previous thread a chance to start
-                              # to ensure it gets a lock
-            thrd.start()
-            threads.append(thrd)
-        for thrd in threads:
-            thrd.join()
-        return errors
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_queued_select(self):
-        """Simple SELECT FOR UPDATE conflict test"""
-
-        errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)])
-        assert not errors
-
-    @testing.crashes('mssql', 'FIXME: unknown')
-    @testing.fails_on('mysql', 'No support for NOWAIT')
-    @testing.crashes('firebird', 'FIXME: unknown')
-    @testing.crashes('sybase', 'FIXME: unknown')
-    @testing.requires.independent_connections
-    def test_nowait_select(self):
-        """Simple SELECT FOR UPDATE NOWAIT conflict test"""
-
-        errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)],
-                update_style='nowait')
-        assert errors
 
 class IsolationLevelTest(fixtures.TestBase):
     __requires__ = ('isolation_level', 'ad_hoc_engines')