# Test the internal _wmi module on Windows
# This is used by the platform module, and potentially others
+import time
import unittest
-from test.support import import_helper, requires_resource
+from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT
# Do this first so test will be skipped if module doesn't exist
_wmi = import_helper.import_module('_wmi', required_on=['win'])
+def wmi_exec_query(query):
+ # gh-112278: WMI maybe slow response when first call.
+ try:
+ return _wmi.exec_query(query)
+ except WindowsError as e:
+ if e.winerror != 258:
+ raise
+ time.sleep(LOOPBACK_TIMEOUT)
+ return _wmi.exec_query(query)
+
+
class WmiTests(unittest.TestCase):
def test_wmi_query_os_version(self):
- r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
+ r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
self.assertEqual(1, len(r))
k, eq, v = r[0].partition("=")
self.assertEqual("=", eq, r[0])
def test_wmi_query_error(self):
# Invalid queries fail with OSError
try:
- _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
+ wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName")
except OSError as ex:
if ex.winerror & 0xFFFFFFFF == 0x80041010:
# This is the expected error code. All others should fail the test
def test_wmi_query_not_select(self):
# Queries other than SELECT are blocked to avoid potential exploits
with self.assertRaises(ValueError):
- _wmi.exec_query("not select, just in case someone tries something")
+ wmi_exec_query("not select, just in case someone tries something")
@requires_resource('cpu')
def test_wmi_query_overflow(self):
# Test multiple times to ensure consistency
for _ in range(2):
with self.assertRaises(OSError):
- _wmi.exec_query("SELECT * FROM CIM_DataFile")
+ wmi_exec_query("SELECT * FROM CIM_DataFile")
def test_wmi_query_multiple_rows(self):
# Multiple instances should have an extra null separator
- r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
+ r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
self.assertFalse(r.startswith("\0"), r)
self.assertFalse(r.endswith("\0"), r)
it = iter(r.split("\0"))
from concurrent.futures import ThreadPoolExecutor
query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
with ThreadPoolExecutor(4) as pool:
- task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
+ task = [pool.submit(wmi_exec_query, query) for _ in range(32)]
for t in task:
self.assertRegex(t.result(), "ProcessId=")