import os
import hashlib
import random
+import threading
import time
-try:
- import threading as _threading
-except ImportError: # pragma: no cover
- import dummy_threading as _threading # type: ignore
-
class EntropyPool:
self.pool_index = 0
self.digest: Optional[bytearray] = None
self.next_byte = 0
- self.lock = _threading.Lock()
+ self.lock = threading.Lock()
self.hash = hashlib.sha1()
self.hash_len = 20
self.pool = bytearray(b"\0" * self.hash_len)
import contextlib
import socket
import sys
+import threading
import time
import random
import warnings
-try:
- import threading as _threading
-except ImportError: # pragma: no cover
- import dummy_threading as _threading # type: ignore
-
import dns.exception
import dns.edns
import dns.flags
class CacheBase:
def __init__(self):
- self.lock = _threading.Lock()
+ self.lock = threading.Lock()
self.statistics = CacheStatistics()
def reset_statistics(self) -> None:
from typing import Callable, Deque, Optional, Set, Union
import collections
-
-try:
- import threading as _threading
-except ImportError: # pragma: no cover
- import dummy_threading as _threading # type: ignore
+import threading
import dns.exception
import dns.immutable
"""
super().__init__(origin, rdclass, relativize)
self._versions: Deque[Version] = collections.deque()
- self._version_lock = _threading.Lock()
+ self._version_lock = threading.Lock()
if pruning_policy is None:
self._pruning_policy = self._default_pruning_policy
else:
self._pruning_policy = pruning_policy
self._write_txn: Optional[Transaction] = None
- self._write_event: Optional[_threading.Event] = None
- self._write_waiters: Deque[_threading.Event] = collections.deque()
+ self._write_event: Optional[threading.Event] = None
+ self._write_waiters: Deque[threading.Event] = collections.deque()
self._readers: Set[Transaction] = set()
self._commit_version_unlocked(
None, WritableVersion(self, replacement=True), origin
# Someone else is writing already, so we will have to
# wait, but we want to do the actual wait outside the
# lock.
- event = _threading.Event()
+ event = threading.Event()
self._write_waiters.append(event)
# wait (note we gave up the lock!)
#
import winreg
try:
- try:
- import threading as _threading
- except ImportError: # pragma: no cover
- import dummy_threading as _threading # type: ignore
+ import threading
import pythoncom
import wmi
if _have_wmi:
- class _WMIGetter(_threading.Thread):
+ class _WMIGetter(threading.Thread):
def __init__(self):
super().__init__()
self.info = DnsInfo()