return obj.workers
def _validate(self) -> None:
+ try:
+ cpu_count = _cpu_count()
+ if int(self.workers) > 10 * cpu_count:
+ raise ValueError("refusing to run with more then instances 10 instances per cpu core")
+ except DataException:
+ # sometimes, we won't be able to get information about the cpu count
+ pass
+
if self.watchdog and self.backend not in ["auto", "systemd"]:
raise ValueError("'watchdog' can only be configured for 'systemd' backend")
super().__init__(source_value)
if isinstance(source_value, int) and not isinstance(source_value, bool):
if hasattr(self, "_min") and (source_value < self._min):
- raise SchemaException(
- f"value {source_value} is lower than the minimum {self._min}.", object_path
- )
+ raise SchemaException(f"value {source_value} is lower than the minimum {self._min}.", object_path)
if hasattr(self, "_max") and (source_value > self._max):
- raise SchemaException(
- f"value {source_value} is higher than the maximum {self._max}", object_path
- )
+ raise SchemaException(f"value {source_value} is higher than the maximum {self._max}", object_path)
self._value = source_value
else:
raise SchemaException(