Replace the generic except Exception block with a custom AutomataError
class that inherits from Exception. This provides more precise exception
handling for automata parsing and validation errors while avoiding
overly broad exception catches that could mask programming errors like
SyntaxError or TypeError.
The AutomataError class is raised when DOT file processing fails due to
invalid format, I/O errors, or malformed automaton definitions. The
main entry point catches this specific exception and provides a
user-friendly error message to stderr before exiting.
Also, replace generic exceptions raising in HA and LTL with
AutomataError.
Co-authored-by: Gabriele Monaco <gmonaco@redhat.com>
Signed-off-by: Wander Lairson Costa <wander@redhat.com>
Reviewed-by: Gabriele Monaco <gmonaco@redhat.com>
Link: https://lore.kernel.org/r/20260223162407.147003-2-wander@redhat.com
Signed-off-by: Gabriele Monaco <gmonaco@redhat.com>
from rvgen.generator import Monitor
from rvgen.container import Container
from rvgen.ltl2k import ltl2k
+ from rvgen.automata import AutomataError
import argparse
import sys
sys.exit(1)
else:
monitor = Container(vars(params))
- except Exception as e:
- print('Error: '+ str(e))
- print("Sorry : :-(")
+ except AutomataError as e:
+ print(f"There was an error processing {params.spec}: {e}", file=sys.stderr)
sys.exit(1)
print("Writing the monitor into the directory %s" % monitor.name)
def __new__(cls, state_id: int, event_id: int):
return super().__new__(cls, (state_id, event_id))
+class AutomataError(Exception):
+ """Exception raised for errors in automata parsing and validation.
+
+ Raised when DOT file processing fails due to invalid format, I/O errors,
+ or malformed automaton definitions.
+ """
+
class Automata:
"""Automata class: Reads a dot file and part it as an automata.
basename = ntpath.basename(self.__dot_path)
if not basename.endswith(".dot") and not basename.endswith(".gv"):
print("not a dot file")
- raise Exception("not a dot file: %s" % self.__dot_path)
+ raise AutomataError("not a dot file: %s" % self.__dot_path)
model_name = ntpath.splitext(basename)[0]
if model_name.__len__() == 0:
- raise Exception("not a dot file: %s" % self.__dot_path)
+ raise AutomataError("not a dot file: %s" % self.__dot_path)
return model_name
dot_lines = []
try:
dot_file = open(self.__dot_path)
- except:
- raise Exception("Cannot open the file: %s" % self.__dot_path)
+ except OSError as exc:
+ raise AutomataError(exc.strerror) from exc
dot_lines = dot_file.read().splitlines()
dot_file.close()
line = dot_lines[cursor].split()
if (line[0] != "digraph") and (line[1] != "state_automaton"):
- raise Exception("Not a valid .dot format: %s" % self.__dot_path)
+ raise AutomataError("Not a valid .dot format: %s" % self.__dot_path)
else:
cursor += 1
return dot_lines
# For further information, see:
# Documentation/trace/rv/deterministic_automata.rst
-from .automata import Automata
+from .automata import Automata, AutomataError
class Dot2c(Automata):
enum_suffix = ""
min_type = "unsigned int"
if self.states.__len__() > 1000000:
- raise Exception("Too many states: %d" % self.states.__len__())
+ raise AutomataError("Too many states: %d" % self.states.__len__())
return min_type
from collections import deque
from .dot2c import Dot2c
from .generator import Monitor
-from .automata import _EventConstraintKey, _StateConstraintKey
+from .automata import _EventConstraintKey, _StateConstraintKey, AutomataError
class dot2k(Monitor, Dot2c):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.is_hybrid_automata():
- raise ValueError("Detected hybrid automata, use the 'ha' class")
+ raise AutomataError("Detected hybrid automata, use the 'ha' class")
class ha2k(dot2k):
"""Hybrid automata only"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.is_hybrid_automata():
- raise ValueError("Detected deterministic automata, use the 'da' class")
+ raise AutomataError("Detected deterministic automata, use the 'da' class")
self.trace_h = self._read_template_file("trace_hybrid.h")
self.__parse_constraints()
# state constraints are only used for expirations (e.g. clk<N)
if self.is_event_constraint(key):
if not rule and not reset:
- raise ValueError("Unrecognised event constraint "
- f"({self.states[key[0]]}/{self.events[key[1]]}: {constr})")
+ raise AutomataError("Unrecognised event constraint "
+ f"({self.states[key[0]]}/{self.events[key[1]]}: {constr})")
if rule and (rule["env"] in self.env_types and
rule["env"] not in self.env_stored):
- raise ValueError("Clocks in hybrid automata always require a storage"
- f" ({rule["env"]})")
+ raise AutomataError("Clocks in hybrid automata always require a storage"
+ f" ({rule["env"]})")
else:
if not rule:
- raise ValueError("Unrecognised state constraint "
- f"({self.states[key]}: {constr})")
+ raise AutomataError("Unrecognised state constraint "
+ f"({self.states[key]}: {constr})")
if rule["env"] not in self.env_stored:
- raise ValueError("State constraints always require a storage "
- f"({rule["env"]})")
+ raise AutomataError("State constraints always require a storage "
+ f"({rule["env"]})")
if rule["op"] not in ["<", "<="]:
- raise ValueError("State constraints must be clock expirations like"
- f" clk<N ({rule.string})")
+ raise AutomataError("State constraints must be clock expirations like"
+ f" clk<N ({rule.string})")
def __parse_constraints(self) -> None:
self.guards: dict[_EventConstraintKey, str] = {}
raise FileNotFoundError("Could not find the rv directory, do you have the kernel source installed?")
def _read_file(self, path):
- try:
- fd = open(path, 'r')
- except OSError:
- raise Exception("Cannot open the file: %s" % path)
+ fd = open(path, 'r')
content = fd.read()
try:
path = os.path.join(self.abs_template_dir, file)
return self._read_file(path)
- except Exception:
+ except OSError:
# Specific template file not found. Try the generic template file in the template/
# directory, which is one level up
path = os.path.join(self.abs_template_dir, "..", file)
from ply.lex import lex
from ply.yacc import yacc
+from .automata import AutomataError
# Grammar:
# ltl ::= opd | ( ltl ) | ltl binop ltl | unop ltl
t_ignore = ' \t\n'
def t_error(t):
- raise ValueError(f"Illegal character '{t.value[0]}'")
+ raise AutomataError(f"Illegal character '{t.value[0]}'")
lexer = lex()
elif p[1] == "not":
op = NotOp(p[2])
else:
- raise ValueError(f"Invalid unary operator {p[1]}")
+ raise AutomataError(f"Invalid unary operator {p[1]}")
p[0] = ASTNode(op)
elif p[2] == "imply":
op = ImplyOp(p[1], p[3])
else:
- raise ValueError(f"Invalid binary operator {p[2]}")
+ raise AutomataError(f"Invalid binary operator {p[2]}")
p[0] = ASTNode(op)
subexpr[assign[0]] = assign[1]
if rule is None:
- raise ValueError("Please define your specification in the \"RULE = <LTL spec>\" format")
+ raise AutomataError("Please define your specification in the \"RULE = <LTL spec>\" format")
for node in rule:
if not isinstance(node.op, Variable):
from pathlib import Path
from . import generator
from . import ltl2ba
+from .automata import AutomataError
COLUMN_LIMIT = 100
if MonitorType != "per_task":
raise NotImplementedError("Only per_task monitor is supported for LTL")
super().__init__(extra_params)
- with open(file_path) as f:
- self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read())
+ try:
+ with open(file_path) as f:
+ self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read())
+ except OSError as exc:
+ raise AutomataError(exc.strerror) from exc
self.atoms_abbr = abbreviate_atoms(self.atoms)
self.name = extra_params.get("model_name")
if not self.name: