]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-112278: Add retry in WMI tests in case of slow initialization (GH-113154)
authorAN Long <aisk@users.noreply.github.com>
Fri, 15 Dec 2023 13:42:37 +0000 (21:42 +0800)
committerGitHub <noreply@github.com>
Fri, 15 Dec 2023 13:42:37 +0000 (13:42 +0000)
Lib/test/test_wmi.py

index 3445702846d8a044cac8a54e96def03f75ee31cc..bf8c52e646dc18d2799967574f3546791a163ac2 100644 (file)
@@ -1,17 +1,29 @@
 # Test the internal _wmi module on Windows
 # This is used by the platform module, and potentially others
 
+import time
 import unittest
-from test.support import import_helper, requires_resource
+from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT
 
 
 # Do this first so test will be skipped if module doesn't exist
 _wmi = import_helper.import_module('_wmi', required_on=['win'])
 
 
+def wmi_exec_query(query):
+    # gh-112278: WMI maybe slow response when first call.
+    try:
+        return _wmi.exec_query(query)
+    except WindowsError as e:
+        if e.winerror != 258:
+            raise
+        time.sleep(LOOPBACK_TIMEOUT)
+        return _wmi.exec_query(query)
+
+
 class WmiTests(unittest.TestCase):
     def test_wmi_query_os_version(self):
-        r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
+        r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
         self.assertEqual(1, len(r))
         k, eq, v = r[0].partition("=")
         self.assertEqual("=", eq, r[0])
@@ -28,7 +40,7 @@ class WmiTests(unittest.TestCase):
     def test_wmi_query_error(self):
         # Invalid queries fail with OSError
         try:
-            _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
+            wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName")
         except OSError as ex:
             if ex.winerror & 0xFFFFFFFF == 0x80041010:
                 # This is the expected error code. All others should fail the test
@@ -42,7 +54,7 @@ class WmiTests(unittest.TestCase):
     def test_wmi_query_not_select(self):
         # Queries other than SELECT are blocked to avoid potential exploits
         with self.assertRaises(ValueError):
-            _wmi.exec_query("not select, just in case someone tries something")
+            wmi_exec_query("not select, just in case someone tries something")
 
     @requires_resource('cpu')
     def test_wmi_query_overflow(self):
@@ -50,11 +62,11 @@ class WmiTests(unittest.TestCase):
         # Test multiple times to ensure consistency
         for _ in range(2):
             with self.assertRaises(OSError):
-                _wmi.exec_query("SELECT * FROM CIM_DataFile")
+                wmi_exec_query("SELECT * FROM CIM_DataFile")
 
     def test_wmi_query_multiple_rows(self):
         # Multiple instances should have an extra null separator
-        r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
+        r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
         self.assertFalse(r.startswith("\0"), r)
         self.assertFalse(r.endswith("\0"), r)
         it = iter(r.split("\0"))
@@ -69,6 +81,6 @@ class WmiTests(unittest.TestCase):
         from concurrent.futures import ThreadPoolExecutor
         query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
         with ThreadPoolExecutor(4) as pool:
-            task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
+            task = [pool.submit(wmi_exec_query, query) for _ in range(32)]
             for t in task:
                 self.assertRegex(t.result(), "ProcessId=")