raw_state = line[-1]
# "enabled_fired"}; -> enabled_fired
- state = raw_state.replace('"', '').replace('};', '').replace(',','_')
+ state = raw_state.replace('"', '').replace('};', '').replace(',', '_')
if state[0:7] == "__init_":
initial_state = state[7:]
else:
nr_state += 1
# declare the matrix....
- matrix = [[ self.invalid_state_str for x in range(nr_event)] for y in range(nr_state)]
+ matrix = [[self.invalid_state_str for x in range(nr_event)] for y in range(nr_state)]
constraints: dict[_ConstraintKey, list[str]] = {}
# and we are back! Let's fill the matrix
while self.__dot_lines[cursor].lstrip()[0] == '"':
if self.__dot_lines[cursor].split()[1] == "->":
line = self.__dot_lines[cursor].split()
- origin_state = line[0].replace('"','').replace(',','_')
- dest_state = line[2].replace('"','').replace(',','_')
+ origin_state = line[0].replace('"', '').replace(',', '_')
+ dest_state = line[2].replace('"', '').replace(',', '_')
possible_events = "".join(line[line.index("label") + 2:-1]).replace('"', '')
for event in possible_events.split("\\n"):
event, *constr = event.split(";")
line += f"\t\t\t{next_state}"
else:
line += f"{next_state:>{maxlen}}"
- if y != nr_events-1:
+ if y != nr_events - 1:
line += ",\n" if linetoolong else ", "
else:
line += ",\n\t\t}," if linetoolong else " },"
tp_args = tp_args_dict[tp_type]
if self._is_id_monitor():
tp_args.insert(0, tp_args_id)
- tp_proto_c = ", ".join([a+b for a,b in tp_args])
- tp_args_c = ", ".join([b for a,b in tp_args])
+ tp_proto_c = ", ".join([a + b for a, b in tp_args])
+ tp_args_c = ", ".join([b for a, b in tp_args])
buff.append(f" TP_PROTO({tp_proto_c}),")
buff.append(f" TP_ARGS({tp_args_c})")
return '\n'.join(buff)
class Monitor(RVGenerator):
- monitor_types = { "global" : 1, "per_cpu" : 2, "per_task" : 3, "per_obj" : 4 }
+ monitor_types = {"global": 1, "per_cpu": 2, "per_task": 3, "per_obj": 4}
def __init__(self, extra_params={}):
super().__init__(extra_params)