# Ignoring typing here, because I can't find a way how to make the _used dict
# typed based on subclass. I am not even sure that it's different between subclasses,
# it's probably still the same dict. But we don't really care about it
- return cls._used[typ][n] # type: ignore
+ return cls._used[typ][n] # type: ignore[return-value]
val = cls(typ, n, _i_know_what_i_am_doing=True)
cls._used[typ][n] = val
return val
# proper closing of the socket is only implemented in later versions of python
if sys.version_info >= (3, 7):
- await writer.wait_closed() # type: ignore
+ await writer.wait_closed()
class SubprocessController(ABC):
logger = logging.getLogger(__name__)
if PROMETHEUS_LIB:
- from prometheus_client import exposition # type: ignore
- from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore
+ from prometheus_client import exposition
+ from prometheus_client.bridge.graphite import GraphiteBridge
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
- GaugeMetricFamily, # type: ignore
+ GaugeMetricFamily,
HistogramMetricFamily,
Metric,
)
def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
c = CounterMetricFamily(name, description, labels=(label[0],))
- c.add_metric((label[1],), value) # type: ignore
+ c.add_metric((label[1],), value)
return c
def _gauge(name: str, description: str, label: Tuple[str, str], value: float) -> GaugeMetricFamily:
c = GaugeMetricFamily(name, description, labels=(label[0],))
- c.add_metric((label[1],), value) # type: ignore
+ c.add_metric((label[1],), value)
return c
def _histogram(
name: str, description: str, label: Tuple[str, str], buckets: List[Tuple[str, int]], sum_value: float
) -> HistogramMetricFamily:
c = HistogramMetricFamily(name, description, labels=(label[0],))
- c.add_metric((label[1],), buckets, sum_value=sum_value) # type: ignore
+ c.add_metric((label[1],), buckets, sum_value=sum_value)
return c
def _parse_resolver_metrics(instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]:
_graphite_bridge = GraphiteBridge(
(str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
)
- _graphite_bridge.start( # type: ignore
+ _graphite_bridge.start(
interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
)
# init and register metrics collector
global _metrics_collector
_metrics_collector = KresPrometheusMetricsCollector(config_store)
- REGISTRY.register(_metrics_collector) # type: ignore
+ REGISTRY.register(_metrics_collector) # type: ignore[arg-type]
# register graphite bridge
await config_store.register_verifier(_deny_turning_off_graphite_bridge)
await _metrics_collector.collect_kresd_stats()
else:
raise RuntimeError("Function invoked before initializing the module!")
- return exposition.generate_latest() # type: ignore
+ return exposition.generate_latest()
return None
def unblock_signals() -> None:
if sys.version_info >= (3, 8):
- signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) # type: ignore
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
else:
# the list of signals is not exhaustive, but it should cover all signals we might ever want to block
signal.pthread_sigmask(
def _create_default(self, obj: Any) -> Any:
if isinstance(obj, _LazyDefault):
- return obj.instantiate() # type: ignore
+ return obj.instantiate()
return obj
def map_object( # noqa: PLR0911, PLR0912
if is_generic_type_wrapper(tp):
inner_type = get_generic_type_wrapper_argument(tp)
obj_valid = self.map_object(inner_type, obj, object_path)
- return tp(obj_valid, object_path=object_path) # type: ignore
+ return tp(obj_valid, object_path=object_path)
# nested BaseSchema subclasses
if inspect.isclass(tp) and issubclass(tp, BaseSchema):
# As this is a delegated constructor, we must ignore protected access warnings
# sanity check
- if not isinstance(source, (BaseSchema, dict)): # type: ignore
+ if not isinstance(source, (BaseSchema, dict)):
raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
# construct lower level schema first if configured to do so
raise ConstructorError(None, None, f"expected a mapping node, but found {node.id}", node.start_mark)
mapping: Dict[Any, Any] = {}
for key_node, value_node in node.value:
- key = self.construct_object(key_node, deep=deep) # type: ignore
+ key = self.construct_object(key_node, deep=deep)
# we need to check, that the key object can be used in a hash table
try:
- _ = hash(key) # type: ignore
+ _ = hash(key)
except TypeError as exc:
raise ConstructorError(
"while constructing a mapping",
# check for duplicate keys
if key in mapping:
raise DataParsingError(f"duplicate key detected: {key_node.start_mark}")
- value = self.construct_object(value_node, deep=deep) # type: ignore
+ value = self.construct_object(value_node, deep=deep)
mapping[key] = value
return mapping
if self is DataFormat.YAML:
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
- return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
+ return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader))
if self is DataFormat.JSON:
return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
data = data.original()
if self is DataFormat.YAML:
- return yaml.safe_dump(data, indent=indent) # type: ignore
+ return yaml.safe_dump(data, indent=indent)
if self is DataFormat.JSON:
return json.dumps(data, indent=indent)
raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
return dict(super().items())
-class RenamedList(List[V], Renamed): # type: ignore
+class RenamedList(List[V], Renamed):
def __getitem__(self, key: Any) -> Any:
res = super().__getitem__(key)
return renamed(res)
origin = getattr(tp, "__origin__", None)
args = get_generic_type_arguments(tp)
- return origin == Union and len(args) == 2 and args[1] == NoneType # type: ignore
+ return origin == Union and len(args) == 2 and args[1] == NoneType
def is_dict(tp: Any) -> bool:
def is_union(tp: Any) -> bool:
"""Return true even for optional types, because they are just a Union[T, NoneType]."""
- return getattr(tp, "__origin__", None) == Union # type: ignore
+ return getattr(tp, "__origin__", None) == Union
def is_literal(tp: Any) -> bool: