import sys
import functools
import itertools
+try:
+ import _wmi
+except ImportError:
+ _wmi = None
### Globals & Constants
version = _norm_version(version)
return system, release, version
-try:
- import _wmi
-except ImportError:
- def _wmi_query(*keys):
+
+def _wmi_query(table, *keys):
+ global _wmi
+ if not _wmi:
raise OSError("not supported")
-else:
- def _wmi_query(table, *keys):
- table = {
- "OS": "Win32_OperatingSystem",
- "CPU": "Win32_Processor",
- }[table]
+ table = {
+ "OS": "Win32_OperatingSystem",
+ "CPU": "Win32_Processor",
+ }[table]
+ try:
data = _wmi.exec_query("SELECT {} FROM {}".format(
",".join(keys),
table,
)).split("\0")
- split_data = (i.partition("=") for i in data)
- dict_data = {i[0]: i[2] for i in split_data}
- return (dict_data[k] for k in keys)
+ except OSError:
+ _wmi = None
+ raise OSError("not supported")
+ split_data = (i.partition("=") for i in data)
+ dict_data = {i[0]: i[2] for i in split_data}
+ return (dict_data[k] for k in keys)
_WIN32_CLIENT_RELEASES = [
LPCWSTR query;
HANDLE writePipe;
HANDLE readPipe;
+ HANDLE connectEvent;
};
NULL, NULL, 0, NULL, 0, 0, &services
);
}
+ if (!SetEvent(data->connectEvent)) {
+ hr = HRESULT_FROM_WIN32(GetLastError());
+ }
if (SUCCEEDED(hr)) {
hr = CoSetProxyBlanket(
services, RPC_C_AUTHN_WINNT, RPC_C_AUTHZ_NONE, NULL,
Py_BEGIN_ALLOW_THREADS
- if (!CreatePipe(&data.readPipe, &data.writePipe, NULL, 0)) {
+ data.connectEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (!data.connectEvent || !CreatePipe(&data.readPipe, &data.writePipe, NULL, 0)) {
err = GetLastError();
} else {
hThread = CreateThread(NULL, 0, _query_thread, (LPVOID*)&data, 0, NULL);
}
}
+ // gh-112278: If current user doesn't have permission to query the WMI, the
+ // function IWbemLocator::ConnectServer will hang for 5 seconds, and there
+ // is no way to specify the timeout. So we use an Event object to simulate
+ // a timeout.
+ switch (WaitForSingleObject(data.connectEvent, 100)) {
+ case WAIT_OBJECT_0:
+ break;
+ case WAIT_TIMEOUT:
+ err = WAIT_TIMEOUT;
+ break;
+ default:
+ err = GetLastError();
+ break;
+ }
+
while (!err) {
if (ReadFile(
data.readPipe,
}
// Allow the thread some time to clean up
- switch (WaitForSingleObject(hThread, 1000)) {
+ switch (WaitForSingleObject(hThread, 100)) {
case WAIT_OBJECT_0:
// Thread ended cleanly
if (!GetExitCodeThread(hThread, (LPDWORD)&err)) {
}
CloseHandle(hThread);
+ CloseHandle(data.connectEvent);
hThread = NULL;
Py_END_ALLOW_THREADS