]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Use python selector in the parallel-vm.py main loop
authorJouni Malinen <j@w1.fi>
Fri, 27 Dec 2019 15:12:34 +0000 (17:12 +0200)
committerJouni Malinen <j@w1.fi>
Fri, 27 Dec 2019 15:12:34 +0000 (17:12 +0200)
This gets rid of the loop that was polling for things to do every 0.25
seconds and instead, reacts to any data from VMs as soon as it becomes
available. This avoids unnecessary operations when no new data is
available and avoids unnecessary waits when new data becomes available
more quickly.

Signed-off-by: Jouni Malinen <j@w1.fi>
tests/hwsim/vm/parallel-vm.py

index a2a60dd4f35fce8300fed5a2ac48ece0de197c04..07fa3d52eb1d92da3277815f42bcdaf8b59d20f8 100755 (executable)
@@ -11,6 +11,7 @@ import curses
 import fcntl
 import logging
 import os
+import selectors
 import subprocess
 import sys
 import time
@@ -87,7 +88,7 @@ def get_failed(vm):
         failed += vm[i]['failed']
     return failed
 
-def vm_read_stdout(vm, i, test_queue):
+def vm_read_stdout(vm, test_queue):
     global total_started, total_passed, total_failed, total_skipped
     global rerun_failures
     global first_run_failures
@@ -102,7 +103,7 @@ def vm_read_stdout(vm, i, test_queue):
         if e.errno == errno.EAGAIN:
             return False
         raise
-    logger.debug("VM[%d] stdout.read[%s]" % (i, out))
+    logger.debug("VM[%d] stdout.read[%s]" % (vm['idx'], out))
     pending = vm['pending'] + out
     lines = []
     while True:
@@ -111,7 +112,7 @@ def vm_read_stdout(vm, i, test_queue):
             break
         line = pending[0:pos].rstrip()
         pending = pending[(pos + 1):]
-        logger.debug("VM[%d] stdout full line[%s]" % (i, line))
+        logger.debug("VM[%d] stdout full line[%s]" % (vm['idx'], line))
         if line.startswith("READY"):
             vm['starting'] = False
             vm['started'] = True
@@ -124,14 +125,15 @@ def vm_read_stdout(vm, i, test_queue):
             total_failed += 1
             vals = line.split(' ')
             if len(vals) < 2:
-                logger.info("VM[%d] incomplete FAIL line: %s" % (i, line))
+                logger.info("VM[%d] incomplete FAIL line: %s" % (vm['idx'],
+                                                                 line))
                 name = line
             else:
                 name = vals[1]
-            logger.debug("VM[%d] test case failed: %s" % (i, name))
+            logger.debug("VM[%d] test case failed: %s" % (vm['idx'], name))
             vm['failed'].append(name)
             if name != vm['current_name']:
-                logger.info("VM[%d] test result mismatch: %s (expected %s)" % (i, name, vm['current_name']))
+                logger.info("VM[%d] test result mismatch: %s (expected %s)" % (vm['idx'], name, vm['current_name']))
             else:
                 count = vm['current_count']
                 if count == 0:
@@ -142,7 +144,7 @@ def vm_read_stdout(vm, i, test_queue):
         elif line.startswith("NOT-FOUND"):
             ready = True
             total_failed += 1
-            logger.info("VM[%d] test case not found" % i)
+            logger.info("VM[%d] test case not found" % vm['idx'])
         elif line.startswith("SKIP"):
             ready = True
             total_skipped += 1
@@ -159,7 +161,8 @@ def vm_read_stdout(vm, i, test_queue):
     vm['pending'] = pending
     return ready
 
-def start_vm(vm):
+def start_vm(vm, sel):
+    logger.info("VM[%d] starting up" % (vm['idx'] + 1))
     vm['starting'] = True
     vm['proc'] = subprocess.Popen(vm['cmd'],
                                   stdin=subprocess.PIPE,
@@ -170,6 +173,7 @@ def start_vm(vm):
         fd = stream.fileno()
         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
         fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+        sel.register(stream, selectors.EVENT_READ, vm)
 
 def num_vm_starting():
     count = 0
@@ -178,6 +182,99 @@ def num_vm_starting():
             count += 1
     return count
 
+def vm_read_stderr(vm):
+    try:
+        err = vm['proc'].stderr.read()
+        if err != None:
+            err = err.decode()
+            if len(err) > 0:
+                vm['err'] += err
+                logger.info("VM[%d] stderr.read[%s]" % (vm['idx'], err))
+    except IOError as e:
+        if e.errno != errno.EAGAIN:
+            raise
+
+def vm_next_step(_vm, scr, test_queue):
+    scr.move(_vm['idx'] + 1, 10)
+    scr.clrtoeol()
+    if not test_queue:
+        _vm['proc'].stdin.write(b'\n')
+        _vm['proc'].stdin.flush()
+        scr.addstr("shutting down")
+        logger.info("VM[%d] shutting down" % _vm['idx'])
+        return
+    (name, count) = test_queue.pop(0)
+    _vm['current_name'] = name
+    _vm['current_count'] = count
+    _vm['proc'].stdin.write(name.encode() + b'\n')
+    _vm['proc'].stdin.flush()
+    scr.addstr(name)
+    logger.debug("VM[%d] start test %s" % (_vm['idx'], name))
+
+def check_vm_start(scr, sel, test_queue):
+    running = False
+    updated = False
+    for i in range(num_servers):
+        if not vm[i]['proc']:
+            # Either not yet started or already stopped VM
+            if test_queue and vm[i]['cmd'] and num_vm_starting() < 2:
+                scr.move(i + 1, 10)
+                scr.clrtoeol()
+                scr.addstr(i + 1, 10, "starting VM")
+                updated = True
+                start_vm(vm[i], sel)
+            else:
+                continue
+
+        running = True
+    return running, updated
+
+def vm_terminated(_vm, scr, sel, test_queue):
+    updated = False
+    for stream in [_vm['proc'].stdout, _vm['proc'].stderr]:
+        sel.unregister(stream)
+    _vm['proc'] = None
+    scr.move(_vm['idx'] + 1, 10)
+    scr.clrtoeol()
+    log = '{}/{}.srv.{}/console'.format(dir, timestamp, _vm['idx'] + 1)
+    with open(log, 'r') as f:
+        if "Kernel panic" in f.read():
+            scr.addstr("kernel panic")
+            logger.info("VM[%d] kernel panic" % _vm['idx'])
+            updated = True
+    if test_queue:
+        num_vm = 0
+        for i in range(num_servers):
+            if _vm['proc']:
+                num_vm += 1
+        if len(test_queue) > num_vm:
+            scr.addstr("unexpected exit")
+            logger.info("VM[%d] unexpected exit" % i)
+            updated = True
+    return updated
+
+def update_screen(scr, total_tests):
+    scr.move(num_servers + 1, 10)
+    scr.clrtoeol()
+    scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests)))
+    scr.addstr(num_servers + 1, 20,
+               "TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped))
+    failed = get_failed(vm)
+    if len(failed) > 0:
+        scr.move(num_servers + 2, 0)
+        scr.clrtoeol()
+        scr.addstr("Failed test cases: ")
+        count = 0
+        for f in failed:
+            count += 1
+            if count > 30:
+                scr.addstr('...')
+                scr.clrtoeol()
+                break
+            scr.addstr(f)
+            scr.addstr(' ')
+    scr.refresh()
+
 def show_progress(scr):
     global num_servers
     global vm
@@ -188,10 +285,11 @@ def show_progress(scr):
     global total_started, total_passed, total_failed, total_skipped
     global rerun_failures
 
+    sel = selectors.DefaultSelector()
     total_tests = len(tests)
     logger.info("Total tests: %d" % total_tests)
     test_queue = [(t, 0) for t in tests]
-    start_vm(vm[0])
+    start_vm(vm[0], sel)
 
     scr.leaveok(1)
     scr.addstr(0, 0, "Parallel test execution status", curses.A_BOLD)
@@ -204,105 +302,27 @@ def show_progress(scr):
     scr.refresh()
 
     while True:
-        running = False
         updated = False
-
-        for i in range(num_servers):
-            if not vm[i]['proc']:
-                if test_queue and vm[i]['cmd'] and num_vm_starting() < 2:
-                    scr.move(i + 1, 10)
-                    scr.clrtoeol()
-                    scr.addstr(i + 1, 10, "starting VM")
-                    updated = True
-                    start_vm(vm[i])
+        events = sel.select(timeout=1)
+        for key, mask in events:
+            _vm = key.data
+            if not _vm['proc']:
                 continue
-            if vm[i]['proc'].poll() is not None:
-                vm[i]['proc'] = None
-                scr.move(i + 1, 10)
-                scr.clrtoeol()
-                log = '{}/{}.srv.{}/console'.format(dir, timestamp, i + 1)
-                with open(log, 'r') as f:
-                    if "Kernel panic" in f.read():
-                        scr.addstr("kernel panic")
-                        logger.info("VM[%d] kernel panic" % i)
-                        updated = True
-                    if test_queue:
-                        num_vm = 0
-                        for i in range(num_servers):
-                            if vm[i]['proc']:
-                                num_vm += 1
-                        if len(test_queue) > num_vm:
-                            scr.addstr("unexpected exit")
-                            logger.info("VM[%d] unexpected exit" % i)
-                            updated = True
-                continue
-
-            running = True
-            try:
-                err = vm[i]['proc'].stderr.read()
-                if err != None:
-                    err = err.decode()
-                    vm[i]['err'] += err
-                    logger.info("VM[%d] stderr.read[%s]" % (i, err))
-            except IOError as e:
-                if e.errno != errno.EAGAIN:
-                    raise
-
-            if vm_read_stdout(vm[i], i, test_queue):
-                scr.move(i + 1, 10)
-                scr.clrtoeol()
+            vm_read_stderr(_vm)
+            if vm_read_stdout(_vm, test_queue):
+                vm_next_step(_vm, scr, test_queue)
                 updated = True
-                if not test_queue:
-                    vm[i]['proc'].stdin.write(b'\n')
-                    vm[i]['proc'].stdin.flush()
-                    scr.addstr("shutting down")
-                    logger.info("VM[%d] shutting down" % i)
-                    continue
-                else:
-                    (name, count) = test_queue.pop(0)
-                    vm[i]['current_name'] = name
-                    vm[i]['current_count'] = count
-                    vm[i]['proc'].stdin.write(name.encode() + b'\n')
-                    vm[i]['proc'].stdin.flush()
-                    scr.addstr(name)
-                    logger.debug("VM[%d] start test %s" % (i, name))
-
-            try:
-                err = vm[i]['proc'].stderr.read()
-                if err != None:
-                    err = err.decode()
-                    vm[i]['err'] += err
-                    logger.debug("VM[%d] stderr.read[%s]" % (i, err))
-            except IOError as e:
-                if e.errno != errno.EAGAIN:
-                    raise
+            vm_read_stderr(_vm)
+            if _vm['proc'].poll() is not None:
+                if vm_terminated(_vm, scr, sel, test_queue):
+                    updated = True
 
+        running, run_update = check_vm_start(scr, sel, test_queue)
+        if updated or run_update:
+            update_screen(scr, total_tests)
         if not running:
             break
-
-        if updated:
-            scr.move(num_servers + 1, 10)
-            scr.clrtoeol()
-            scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests)))
-            scr.addstr(num_servers + 1, 20, "TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped))
-            failed = get_failed(vm)
-            if len(failed) > 0:
-                scr.move(num_servers + 2, 0)
-                scr.clrtoeol()
-                scr.addstr("Failed test cases: ")
-                count = 0
-                for f in failed:
-                    count += 1
-                    if count > 30:
-                        scr.addstr('...')
-                        scr.clrtoeol()
-                        break
-                    scr.addstr(f)
-                    scr.addstr(' ')
-
-            scr.refresh()
-
-        time.sleep(0.25)
+    sel.close()
 
     for i in range(num_servers):
         if not vm[i]['proc']:
@@ -436,9 +456,6 @@ def main():
 
     vm = {}
     for i in range(0, num_servers):
-        print("\rStarting virtual machine {}/{}".format(i + 1, num_servers),
-              end='')
-        logger.info("Starting virtual machine {}/{}".format(i + 1, num_servers))
         cmd = [os.path.join(scriptsdir, 'vm-run.sh'),
                '--timestamp', str(timestamp),
                '--ext', 'srv.%d' % (i + 1),
@@ -446,6 +463,7 @@ def main():
         if args.telnet:
             cmd += ['--telnet', str(args.telnet + i)]
         vm[i] = {}
+        vm[i]['idx'] = i
         vm[i]['starting'] = False
         vm[i]['started'] = False
         vm[i]['cmd'] = cmd