]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1723] tools/check-messages.py
authorAndrei Pavel <andrei@isc.org>
Tue, 25 Jun 2024 21:38:26 +0000 (00:38 +0300)
committerAndrei Pavel <andrei@isc.org>
Tue, 9 Jul 2024 10:49:31 +0000 (13:49 +0300)
.gitlab-ci.yml
src/lib/dhcpsrv/fuzz_messages.mes
tools/check-messages.py [new file with mode: 0755]
tools/find-uninstalled-headers.py

index 9e2d292c9c8e349de8c803660ad6b6f76690afc1..af49a62e435bc5a049858cf613892989e1cfb858 100644 (file)
@@ -49,7 +49,7 @@ uninstalled-headers:
 messages:
   stage: test
   script:
-    - ./tools/check-messages.sh
+    - ./tools/check-messages.py
 
 missing-api-commands:
   stage: test
index f4f43839a93f4cbf6a188b5edff67dc14a49fa0a..a761cf52ce548058c15242e091d630769284aa09 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2019 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2015-2024 Internet Systems Consortium, Inc. ("ISC")
 #
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
diff --git a/tools/check-messages.py b/tools/check-messages.py
new file mode 100755 (executable)
index 0000000..c7e4800
--- /dev/null
@@ -0,0 +1,318 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2024 Internet Systems Consortium, Inc. ("ISC")
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import argparse
+import os
+import pathlib
+import re
+import subprocess
+import sys
+
+
+USAGE = """
+This script does several verifications regarding logged messages:
+1. Checks that messages are logged only once (outside of an exhonerated list).
+2. Checks that no two messages share the same id.
+3. Checks that there are no unlogged/unused messages.
+4. Removes all occurences of unused messages (when run with -a).
+5. Checks that the debug log level is correctly logged in the message documentation.
+6. Automatically adds or fixes the debug log level in the message documentation (when run with -a).
+7. Checks that the placeholder ids are consecutive, starting with 1, and unique in the same message definition.
+"""
+
+
+def check_duplicate_occurences(occurences):
+    failure = False
+    duplicate_occurences = {k: v for k, v in occurences.items() if v > 1}
+    if len(duplicate_occurences) > 1:
+        print('Duplicate occurences found:')
+        for k, v in duplicate_occurences.items():
+            print(f'    % {k}: {v}')
+        failure = True
+    return failure
+
+
+def check_unlogged_messages(messages, autofix):
+    all_source_files = set(pathlib.Path('.').glob('**/*.cc')) \
+        - set(pathlib.Path('.').glob('**/*messages.cc')) \
+        | set(pathlib.Path('.').glob('**/*.h')) \
+        - set(pathlib.Path('.').glob('**/*messages.h'))
+    all_source_code = ''
+    for file in all_source_files:
+        with open(file, 'r', encoding='utf-8') as f:
+            all_source_code += f.read()
+    failure = False
+    for message_id in messages:
+        if message_id not in all_source_code:
+            if not failure:  # in other words: if first
+                print('Unlogged messages found:')
+                failure = True
+            print(f'    % {message_id}')
+            if autofix:
+                remove_message_definition(message_id, messages[message_id]['file'])
+                print('        ^ autofixed')
+    return failure
+
+
+# This function is deprecated. Replaced by check_unlogged_messages.
+# Messages can appear outside LOG_* function calls.
+# So checking occurences is not enough.
+def check_unlogged_messages_based_on_occurences(messages, occurences, autofix):
+    failure = False
+    for message_id in messages:
+        if message_id not in occurences:
+            if not failure:  # in other words: if first
+                print('Unlogged messages found:')
+                failure = True
+            print(f'    % {message_id}')
+            if autofix:
+                remove_message_definition(message_id, messages[message_id]['file'])
+                print('        ^ autofixed')
+    return failure
+
+
+def check_that_debug_log_levels_are_documented(messages, debug_levels, log_lines, autofix):
+    failure = False
+    for message_id, message in messages.items():
+        log_level = None
+        for line in log_lines:
+            if line.startswith('LOG_DEBUG') and message_id in line:
+                log_level = line.split(',')[1].strip().replace('isc::log::', '').replace('log::', '')
+                break
+        if log_level is None:
+            continue
+        if not log_level.isdigit():
+            log_level = debug_levels[log_level]
+        if isinstance(log_level, str):
+            log_level = int(log_level)
+        if not isinstance(log_level, int):
+            print(f'Could not determine numerical log level of {message_id}. Supposedly {log_level}?')
+            failure = True
+        if message['debug_log_level_line'] != f'Logged at debug log level {log_level}.':
+            if not failure:  # in other words: if first
+                print('Messages that do not document their debug log levels:')
+                failure = True
+            print(f'    % {message_id}: {message["debug_log_level_line"]}')
+            print(f'        % It should be: Logged at debug log level {log_level}.')
+            if autofix:
+                file = message['file']
+                # If line is already there, remove it.
+                if message['debug_log_level_line'].startswith('Logged at debug log level '):
+                    line_number = run(fr'grep -En "\b{message_id}\b" "{file}" | cut -d ":" -f 1')
+                    line_number = int(line_number) + 1
+                    run(f'sed "{line_number}d" "{file}" > "{file}.tmp"')
+                    run(f'mv "{file}.tmp" "{file}"')
+                # And add the right one.
+                run(f'sed "/^% {message_id} /a Logged at debug log level {log_level}." "{file}" > "{file}.tmp"')
+                run(f'mv "{file}.tmp" "{file}"')
+                print('            ^ autofixed')
+    return failure
+
+
+def check_placeholder_ids(messages):
+    failure = False
+    placeholder_id_pattern = re.compile('(%[0-9]+)')
+    for message_id in messages:
+        text = messages[message_id]['text']
+        matches = placeholder_id_pattern.search(text)
+        if matches is not None:
+            for i in range(len(matches.groups())):
+                match = matches.group(i)
+                if match != f'%{i + 1}':
+                    print(f'Expected %{i + 1} but found {match} for message:')
+                    print(f'    % {message_id} {text}')
+                    failure = True
+    return failure
+
+
+def remove_message_definition(message, file):
+    new_lines = []
+    removing = False
+    with open(file, 'r', encoding='utf-8') as f:
+        lines = f.read().splitlines()
+        for line in lines:
+            if line.startswith(f'% {message}'):
+                removing = True
+            elif not removing:
+                new_lines.append(line)
+            elif len(line) == 0:
+                removing = False
+    with open(file, 'w', encoding='utf-8') as f:
+        for i in new_lines:
+            f.write(i)
+            f.write('\n')
+
+
+def run(command):
+    ''' Executes a shell command and returns its output.
+
+    :param command: the command to be executed
+    :type command: str
+
+    :return: the standard output from the command
+    :type: str
+    '''
+    if 'DEBUG' in os.environ:
+        print(f'> {command}')
+    # Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified, security
+    #        issue.
+    with subprocess.Popen(command, encoding='utf-8', shell=True,  # nosec B602
+                          stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
+        output, error = p.communicate()
+    if error:
+        print('ERROR:', error, file=sys.stderr)
+        sys.exit(2)
+    return output.strip()
+
+
+def main():
+    # Parse parameters.
+    parser = argparse.ArgumentParser(description=USAGE,
+                                     formatter_class=argparse.RawTextHelpFormatter)
+    parser.add_argument('-a', '--autofix', action='store_true',
+                        help='Autofix unused messages and debug log levels in docs.')
+    args = parser.parse_args()
+
+    # Initializations
+    failure = False
+    debug_levels = {}
+    log_lines = []
+    messages = {}
+    occurences = {}
+    debug_level_pattern = re.compile(r'^(extern |)const int (.*DBG.*) =(.*)$')
+    message_id_pattern = re.compile(r'^% (\w+) (.*)')
+    log_pattern = re.compile(r'\b(LOG_DEBUG|LOG_ERROR|LOG_FATAL|LOG_INFO|LOG_WARN)\(')
+
+    # Process .mes files.
+    mes_files = sorted(pathlib.Path('.').glob('**/*.mes'))
+    for mes_file in mes_files:
+        with open(mes_file, 'r', encoding='utf-8') as f:
+            current_message_id = None
+            lines = f.read().splitlines()
+            for line in lines:
+                if len(line) == 0:
+                    current_message_id = None
+
+                message_id_matches = message_id_pattern.search(line)
+                if message_id_matches is None:
+                    # Could be message description.
+                    if current_message_id is not None:
+                        if 'debug_log_level_line' in messages[current_message_id]:
+                            # If debug log level line is already there, that's all that concerns us, so unset
+                            # current_message_id so that we ignore future lines.
+                            current_message_id = None
+                        else:
+                            messages[current_message_id]['debug_log_level_line'] = line
+                else:
+                    # Message definition
+                    message_id = message_id_matches.group(1)
+                    message_text = message_id_matches.group(2)
+
+                    # 2. Checks that no two messages share the same id.
+                    if message_id in messages:
+                        print(f'Duplicate message id definition: {message_id}. Check in both core and premium.')
+                        failure = True
+
+                    current_message_id = message_id
+                    messages[message_id] = {
+                        'file': mes_file,
+                        'text': message_text,
+                    }
+
+    # Process .cc and .h files.
+    cc_files = sorted(pathlib.Path('.').glob('**/*.cc'))
+    h_files = sorted(pathlib.Path('.').glob('**/*.h'))
+    cpp_files = cc_files + h_files
+    for cpp_file in cpp_files:
+        with open(cpp_file, 'r', encoding='utf-8') as f:
+            lines = f.read().splitlines()
+            current_log_line = ''
+            current_debug_level = ''
+            for line in lines:
+                line = line.strip()
+
+                if len(current_debug_level) != 0:
+                    debug_levels[current_debug_level] = (line.strip().strip().rstrip(';')
+                                                         .replace('isc::log::', '').replace('log::', ''))
+                    current_debug_level = ''
+
+                matches = debug_level_pattern.search(line)
+                if matches is not None:
+                    level = matches.group(2)
+                    reference = matches.group(3)
+                    if level is not None:
+                        if len(reference) == 0:
+                            current_debug_level = level
+                        else:
+                            debug_levels[level] = (reference.strip().rstrip(';')
+                                                   .replace('isc::log::', '').replace('log::', ''))
+
+                if len(current_log_line) == 0:
+                    matches = log_pattern.search(line)
+                    if matches is not None:
+                        if not line.startswith('//'):
+                            current_log_line = line
+                    else:
+                        continue
+                else:
+                    current_log_line += line.strip()
+                if current_log_line.endswith(';'):
+                    log_lines.append(current_log_line)
+                    current_log_line = ''
+    log_lines = sorted(log_lines)
+
+    # Resolve all debug_levels to numbers.
+    finished = False
+    while not finished:
+        finished = True
+        for level, reference in debug_levels.items():
+            if reference.isdigit():
+                continue
+            for i in reference.split(' '):
+                if i in debug_levels:
+                    if debug_levels[i].isdigit():
+                        debug_levels[level] = debug_levels[level].replace(i, debug_levels[i])
+                        finished = False
+            if finished:  # in other words, if no replacement was done, so if all replacements were already done, then:
+                debug_levels[level] = eval(reference)  # pylint: disable=eval-used
+    for level in debug_levels:
+        debug_levels[level] = int(debug_levels[level])
+
+    # Get number of occurences for each message id.
+    for line in log_lines:
+        pos = 1
+        if line.split('(')[0] == 'LOG_DEBUG':
+            pos = 2
+        message_id = line.split(',')[pos]
+        message_id = message_id.split(')')[0]
+        message_id = message_id.strip()
+        if message_id in occurences:
+            occurences[message_id] += 1
+        else:
+            occurences[message_id] = 1
+
+    # 1. Checks that messages are logged only once.
+    failure |= check_duplicate_occurences(occurences)
+
+    # 3. Checks that there are no unlogged/unused messages.
+    # 4. Removes all occurences of unused messages (when run with -a).
+    failure |= check_unlogged_messages(messages, args.autofix)
+
+    # 5. Checks that the debug log level is correctly logged in the message documentation.
+    # 6. Automatically adds or fixes the debug log level in the message documentation (when run with -a).
+    failure |= check_that_debug_log_levels_are_documented(messages, debug_levels, log_lines, args.autofix)
+
+    # 7. Checks that the placeholder ids are consecutive, starting with 1, and unique in the same message definition.
+    failure |= check_placeholder_ids(messages)
+
+    if failure:
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()
index 351f045adeddc64a52eb0738fe129493b20235b4..e1f42e55988f31eac01fa2abef36644c04d044b1 100755 (executable)
@@ -27,7 +27,7 @@ def main():
     failure = False
 
     for makefile_am in makefile_ams:
-        with open(makefile_am, 'r', encoding="utf8") as f:
+        with open(makefile_am, 'r', encoding='utf-8') as f:
             lines = f.readlines()
             in_headers_block = False
             for line in lines: