]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-117348: Refactored RawConfigParser._read for similicity and comprehensibility...
authorJason R. Coombs <jaraco@jaraco.com>
Fri, 29 Mar 2024 20:06:09 +0000 (16:06 -0400)
committerGitHub <noreply@github.com>
Fri, 29 Mar 2024 20:06:09 +0000 (16:06 -0400)
* Extract method for _read_inner, reducing complexity and indentation by 1.

* Extract method for _raise_all and yield ParseErrors from _read_inner.

Reduces complexity by 1 and reduces touch points for handling errors in _read_inner.

* Prefer iterators to splat expansion and literal indexing.

* Extract method for _strip_comments. Reduces complexity by 7.

* Model the file lines in a class to encapsulate the comment status and cleaned value.

* Encapsulate the read state as a dataclass

* Extract _handle_continuation_line and _handle_rest methods. Reduces complexity by 8.

* Reindent

* At least for now, collect errors in the ReadState

* Check for missing section header separately.

* Extract methods for _handle_header and _handle_option. Reduces complexity by 6.

* Remove unreachable code. Reduces complexity by 4.

* Remove unreachable branch

* Handle error condition early. Reduces complexity by 1.

* Add blurb

* Move _raise_all to ParsingError, as its behavior is most closely related to the exception class and not the reader.

* Split _strip* into separate methods.

* Refactor _strip_full to compute the strip just once and use 'not any' to determine the factor.

* Replace use of 'sys.maxsize' with direct computation of the stripped value.

* Extract has_comments as a dynamic property.

* Implement clean as a cached property.

* Model comment prefixes in the RawConfigParser within a prefixes namespace.

* Use a regular expression to search for the first match.

Avoids mutating variables and tricky logic and over-computing all of the starts when only the first is relevant.

Lib/configparser.py
Misc/NEWS.d/next/Library/2024-03-29-12-07-26.gh-issue-117348.WjCYvK.rst [new file with mode: 0644]

index 3040e1fbe5b9c1832ab19d16e950bcd066d66e95..d0326c60e9b9079d479dcaf652781a886b461b25 100644 (file)
@@ -145,12 +145,15 @@ ConfigParser -- responsible for parsing a list of
 
 from collections.abc import MutableMapping
 from collections import ChainMap as _ChainMap
+import contextlib
+from dataclasses import dataclass, field
 import functools
 import io
 import itertools
 import os
 import re
 import sys
+from typing import Iterable
 
 __all__ = ("NoSectionError", "DuplicateOptionError", "DuplicateSectionError",
            "NoOptionError", "InterpolationError", "InterpolationDepthError",
@@ -302,15 +305,33 @@ class InterpolationDepthError(InterpolationError):
 class ParsingError(Error):
     """Raised when a configuration file does not follow legal syntax."""
 
-    def __init__(self, source):
+    def __init__(self, source, *args):
         super().__init__(f'Source contains parsing errors: {source!r}')
         self.source = source
         self.errors = []
         self.args = (source, )
+        if args:
+            self.append(*args)
 
     def append(self, lineno, line):
         self.errors.append((lineno, line))
-        self.message += '\n\t[line %2d]: %s' % (lineno, line)
+        self.message += '\n\t[line %2d]: %s' % (lineno, repr(line))
+
+    def combine(self, others):
+        for other in others:
+            for error in other.errors:
+                self.append(*error)
+        return self
+
+    @staticmethod
+    def _raise_all(exceptions: Iterable['ParsingError']):
+        """
+        Combine any number of ParsingErrors into one and raise it.
+        """
+        exceptions = iter(exceptions)
+        with contextlib.suppress(StopIteration):
+            raise next(exceptions).combine(exceptions)
+
 
 
 class MissingSectionHeaderError(ParsingError):
@@ -517,6 +538,55 @@ class ExtendedInterpolation(Interpolation):
                     "found: %r" % (rest,))
 
 
+@dataclass
+class _ReadState:
+    elements_added : set[str] = field(default_factory=set)
+    cursect : dict[str, str] | None = None
+    sectname : str | None = None
+    optname : str | None = None
+    lineno : int = 0
+    indent_level : int = 0
+    errors : list[ParsingError] = field(default_factory=list)
+
+
+@dataclass
+class _Prefixes:
+    full : Iterable[str]
+    inline : Iterable[str]
+
+
+class _Line(str):
+
+    def __new__(cls, val, *args, **kwargs):
+        return super().__new__(cls, val)
+
+    def __init__(self, val, prefixes: _Prefixes):
+        self.prefixes = prefixes
+
+    @functools.cached_property
+    def clean(self):
+        return self._strip_full() and self._strip_inline()
+
+    @property
+    def has_comments(self):
+        return self.strip() != self.clean
+
+    def _strip_inline(self):
+        """
+        Search for the earliest prefix at the beginning of the line or following a space.
+        """
+        matcher = re.compile(
+            '|'.join(fr'(^|\s)({re.escape(prefix)})' for prefix in self.prefixes.inline)
+            # match nothing if no prefixes
+            or '(?!)'
+        )
+        match = matcher.search(self)
+        return self[:match.start() if match else None].strip()
+
+    def _strip_full(self):
+        return '' if any(map(self.strip().startswith, self.prefixes.full)) else True
+
+
 class RawConfigParser(MutableMapping):
     """ConfigParser that does not do interpolation."""
 
@@ -583,8 +653,10 @@ class RawConfigParser(MutableMapping):
             else:
                 self._optcre = re.compile(self._OPT_TMPL.format(delim=d),
                                           re.VERBOSE)
-        self._comment_prefixes = tuple(comment_prefixes or ())
-        self._inline_comment_prefixes = tuple(inline_comment_prefixes or ())
+        self._prefixes = _Prefixes(
+            full=tuple(comment_prefixes or ()),
+            inline=tuple(inline_comment_prefixes or ()),
+        )
         self._strict = strict
         self._allow_no_value = allow_no_value
         self._empty_lines_in_values = empty_lines_in_values
@@ -975,147 +1047,117 @@ class RawConfigParser(MutableMapping):
         in an otherwise empty line or may be entered in lines holding values or
         section names. Please note that comments get stripped off when reading configuration files.
         """
-        elements_added = set()
-        cursect = None                        # None, or a dictionary
-        sectname = None
-        optname = None
-        lineno = 0
-        indent_level = 0
-        e = None                              # None, or an exception
 
         try:
-            for lineno, line in enumerate(fp, start=1):
-                comment_start = sys.maxsize
-                # strip inline comments
-                inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
-                while comment_start == sys.maxsize and inline_prefixes:
-                    next_prefixes = {}
-                    for prefix, index in inline_prefixes.items():
-                        index = line.find(prefix, index+1)
-                        if index == -1:
-                            continue
-                        next_prefixes[prefix] = index
-                        if index == 0 or (index > 0 and line[index-1].isspace()):
-                            comment_start = min(comment_start, index)
-                    inline_prefixes = next_prefixes
-                # strip full line comments
-                for prefix in self._comment_prefixes:
-                    if line.strip().startswith(prefix):
-                        comment_start = 0
-                        break
-                if comment_start == sys.maxsize:
-                    comment_start = None
-                value = line[:comment_start].strip()
-                if not value:
-                    if self._empty_lines_in_values:
-                        # add empty line to the value, but only if there was no
-                        # comment on the line
-                        if (comment_start is None and
-                            cursect is not None and
-                            optname and
-                            cursect[optname] is not None):
-                            cursect[optname].append('') # newlines added at join
-                    else:
-                        # empty line marks end of value
-                        indent_level = sys.maxsize
-                    continue
-                # continuation line?
-                first_nonspace = self.NONSPACECRE.search(line)
-                cur_indent_level = first_nonspace.start() if first_nonspace else 0
-                if (cursect is not None and optname and
-                    cur_indent_level > indent_level):
-                    if cursect[optname] is None:
-                        raise MultilineContinuationError(fpname, lineno, line)
-                    cursect[optname].append(value)
-                # a section header or option header?
-                else:
-                    if self._allow_unnamed_section and cursect is None:
-                        sectname = UNNAMED_SECTION
-                        cursect = self._dict()
-                        self._sections[sectname] = cursect
-                        self._proxies[sectname] = SectionProxy(self, sectname)
-                        elements_added.add(sectname)
-
-                    indent_level = cur_indent_level
-                    # is it a section header?
-                    mo = self.SECTCRE.match(value)
-                    if mo:
-                        sectname = mo.group('header')
-                        if sectname in self._sections:
-                            if self._strict and sectname in elements_added:
-                                raise DuplicateSectionError(sectname, fpname,
-                                                            lineno)
-                            cursect = self._sections[sectname]
-                            elements_added.add(sectname)
-                        elif sectname == self.default_section:
-                            cursect = self._defaults
-                        else:
-                            cursect = self._dict()
-                            self._sections[sectname] = cursect
-                            self._proxies[sectname] = SectionProxy(self, sectname)
-                            elements_added.add(sectname)
-                        # So sections can't start with a continuation line
-                        optname = None
-                    # no section header?
-                    elif cursect is None:
-                        raise MissingSectionHeaderError(fpname, lineno, line)
-                        # an option line?
-                    else:
-                        indent_level = cur_indent_level
-                        # is it a section header?
-                        mo = self.SECTCRE.match(value)
-                        if mo:
-                            sectname = mo.group('header')
-                            if sectname in self._sections:
-                                if self._strict and sectname in elements_added:
-                                    raise DuplicateSectionError(sectname, fpname,
-                                                                lineno)
-                                cursect = self._sections[sectname]
-                                elements_added.add(sectname)
-                            elif sectname == self.default_section:
-                                cursect = self._defaults
-                            else:
-                                cursect = self._dict()
-                                self._sections[sectname] = cursect
-                                self._proxies[sectname] = SectionProxy(self, sectname)
-                                elements_added.add(sectname)
-                            # So sections can't start with a continuation line
-                            optname = None
-                        # no section header in the file?
-                        elif cursect is None:
-                            raise MissingSectionHeaderError(fpname, lineno, line)
-                        # an option line?
-                        else:
-                            mo = self._optcre.match(value)
-                            if mo:
-                                optname, vi, optval = mo.group('option', 'vi', 'value')
-                                if not optname:
-                                    e = self._handle_error(e, fpname, lineno, line)
-                                optname = self.optionxform(optname.rstrip())
-                                if (self._strict and
-                                    (sectname, optname) in elements_added):
-                                    raise DuplicateOptionError(sectname, optname,
-                                                            fpname, lineno)
-                                elements_added.add((sectname, optname))
-                                # This check is fine because the OPTCRE cannot
-                                # match if it would set optval to None
-                                if optval is not None:
-                                    optval = optval.strip()
-                                    cursect[optname] = [optval]
-                                else:
-                                    # valueless option handling
-                                    cursect[optname] = None
-                            else:
-                                # a non-fatal parsing error occurred. set up the
-                                # exception but keep going. the exception will be
-                                # raised at the end of the file and will contain a
-                                # list of all bogus lines
-                                e = self._handle_error(e, fpname, lineno, line)
+            ParsingError._raise_all(self._read_inner(fp, fpname))
         finally:
             self._join_multiline_values()
-        # if any parsing errors occurred, raise an exception
-        if e:
-            raise e
+
+    def _read_inner(self, fp, fpname):
+        st = _ReadState()
+
+        Line = functools.partial(_Line, prefixes=self._prefixes)
+        for st.lineno, line in enumerate(map(Line, fp), start=1):
+            if not line.clean:
+                if self._empty_lines_in_values:
+                    # add empty line to the value, but only if there was no
+                    # comment on the line
+                    if (not line.has_comments and
+                        st.cursect is not None and
+                        st.optname and
+                        st.cursect[st.optname] is not None):
+                        st.cursect[st.optname].append('') # newlines added at join
+                else:
+                    # empty line marks end of value
+                    st.indent_level = sys.maxsize
+                continue
+
+            first_nonspace = self.NONSPACECRE.search(line)
+            st.cur_indent_level = first_nonspace.start() if first_nonspace else 0
+
+            if self._handle_continuation_line(st, line, fpname):
+                continue
+
+            self._handle_rest(st, line, fpname)
+
+        return st.errors
+
+    def _handle_continuation_line(self, st, line, fpname):
+        # continuation line?
+        is_continue = (st.cursect is not None and st.optname and
+            st.cur_indent_level > st.indent_level)
+        if is_continue:
+            if st.cursect[st.optname] is None:
+                raise MultilineContinuationError(fpname, st.lineno, line)
+            st.cursect[st.optname].append(line.clean)
+        return is_continue
+
+    def _handle_rest(self, st, line, fpname):
+        # a section header or option header?
+        if self._allow_unnamed_section and st.cursect is None:
+            st.sectname = UNNAMED_SECTION
+            st.cursect = self._dict()
+            self._sections[st.sectname] = st.cursect
+            self._proxies[st.sectname] = SectionProxy(self, st.sectname)
+            st.elements_added.add(st.sectname)
+
+        st.indent_level = st.cur_indent_level
+        # is it a section header?
+        mo = self.SECTCRE.match(line.clean)
+
+        if not mo and st.cursect is None:
+            raise MissingSectionHeaderError(fpname, st.lineno, line)
+
+        self._handle_header(st, mo, fpname) if mo else self._handle_option(st, line, fpname)
+
+    def _handle_header(self, st, mo, fpname):
+        st.sectname = mo.group('header')
+        if st.sectname in self._sections:
+            if self._strict and st.sectname in st.elements_added:
+                raise DuplicateSectionError(st.sectname, fpname,
+                                            st.lineno)
+            st.cursect = self._sections[st.sectname]
+            st.elements_added.add(st.sectname)
+        elif st.sectname == self.default_section:
+            st.cursect = self._defaults
+        else:
+            st.cursect = self._dict()
+            self._sections[st.sectname] = st.cursect
+            self._proxies[st.sectname] = SectionProxy(self, st.sectname)
+            st.elements_added.add(st.sectname)
+        # So sections can't start with a continuation line
+        st.optname = None
+
+    def _handle_option(self, st, line, fpname):
+        # an option line?
+        st.indent_level = st.cur_indent_level
+
+        mo = self._optcre.match(line.clean)
+        if not mo:
+            # a non-fatal parsing error occurred. set up the
+            # exception but keep going. the exception will be
+            # raised at the end of the file and will contain a
+            # list of all bogus lines
+            st.errors.append(ParsingError(fpname, st.lineno, line))
+            return
+
+        st.optname, vi, optval = mo.group('option', 'vi', 'value')
+        if not st.optname:
+            st.errors.append(ParsingError(fpname, st.lineno, line))
+        st.optname = self.optionxform(st.optname.rstrip())
+        if (self._strict and
+            (st.sectname, st.optname) in st.elements_added):
+            raise DuplicateOptionError(st.sectname, st.optname,
+                                    fpname, st.lineno)
+        st.elements_added.add((st.sectname, st.optname))
+        # This check is fine because the OPTCRE cannot
+        # match if it would set optval to None
+        if optval is not None:
+            optval = optval.strip()
+            st.cursect[st.optname] = [optval]
+        else:
+            # valueless option handling
+            st.cursect[st.optname] = None
 
     def _join_multiline_values(self):
         defaults = self.default_section, self._defaults
@@ -1135,12 +1177,6 @@ class RawConfigParser(MutableMapping):
         for key, value in defaults.items():
             self._defaults[self.optionxform(key)] = value
 
-    def _handle_error(self, exc, fpname, lineno, line):
-        if not exc:
-            exc = ParsingError(fpname)
-        exc.append(lineno, repr(line))
-        return exc
-
     def _unify_values(self, section, vars):
         """Create a sequence of lookups with 'vars' taking priority over
         the 'section' which takes priority over the DEFAULTSECT.
diff --git a/Misc/NEWS.d/next/Library/2024-03-29-12-07-26.gh-issue-117348.WjCYvK.rst b/Misc/NEWS.d/next/Library/2024-03-29-12-07-26.gh-issue-117348.WjCYvK.rst
new file mode 100644 (file)
index 0000000..cd3006c
--- /dev/null
@@ -0,0 +1,2 @@
+Refactored :meth:`configparser.RawConfigParser._read` to reduce cyclometric
+complexity and improve comprehensibility.