--- /dev/null
+"""Support for documenting audit events."""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+
+from docutils import nodes
+from sphinx.errors import NoUri
+from sphinx.locale import _ as sphinx_gettext
+from sphinx.transforms.post_transforms import SphinxPostTransform
+from sphinx.util import logging
+from sphinx.util.docutils import SphinxDirective
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from sphinx.application import Sphinx
+ from sphinx.builders import Builder
+ from sphinx.environment import BuildEnvironment
+
+logger = logging.getLogger(__name__)
+
+# This list of sets are allowable synonyms for event argument names.
+# If two names are in the same set, they are treated as equal for the
+# purposes of warning. This won't help if the number of arguments is
+# different!
+_SYNONYMS = [
+ frozenset({"file", "path", "fd"}),
+]
+
+
+class AuditEvents:
+ def __init__(self) -> None:
+ self.events: dict[str, list[str]] = {}
+ self.sources: dict[str, list[tuple[str, str]]] = {}
+
+ def __iter__(self) -> Iterator[tuple[str, list[str], tuple[str, str]]]:
+ for name, args in self.events.items():
+ for source in self.sources[name]:
+ yield name, args, source
+
+ def add_event(
+ self, name, args: list[str], source: tuple[str, str]
+ ) -> None:
+ if name in self.events:
+ self._check_args_match(name, args)
+ else:
+ self.events[name] = args
+ self.sources.setdefault(name, []).append(source)
+
+ def _check_args_match(self, name: str, args: list[str]) -> None:
+ current_args = self.events[name]
+ msg = (
+ f"Mismatched arguments for audit-event {name}: "
+ f"{current_args!r} != {args!r}"
+ )
+ if current_args == args:
+ return
+ if len(current_args) != len(args):
+ logger.warning(msg)
+ return
+ for a1, a2 in zip(current_args, args, strict=False):
+ if a1 == a2:
+ continue
+ if any(a1 in s and a2 in s for s in _SYNONYMS):
+ continue
+ logger.warning(msg)
+ return
+
+ def id_for(self, name) -> str:
+ source_count = len(self.sources.get(name, ()))
+ name_clean = re.sub(r"\W", "_", name)
+ return f"audit_event_{name_clean}_{source_count}"
+
+ def rows(self) -> Iterator[tuple[str, list[str], list[tuple[str, str]]]]:
+ for name in sorted(self.events.keys()):
+ yield name, self.events[name], self.sources[name]
+
+
+def initialise_audit_events(app: Sphinx) -> None:
+ """Initialise the audit_events attribute on the environment."""
+ if not hasattr(app.env, "audit_events"):
+ app.env.audit_events = AuditEvents()
+
+
+def audit_events_purge(
+ app: Sphinx, env: BuildEnvironment, docname: str
+) -> None:
+ """This is to remove traces of removed documents from env.audit_events."""
+ fresh_audit_events = AuditEvents()
+ for name, args, (doc, target) in env.audit_events:
+ if doc != docname:
+ fresh_audit_events.add_event(name, args, (doc, target))
+
+
+def audit_events_merge(
+ app: Sphinx,
+ env: BuildEnvironment,
+ docnames: list[str],
+ other: BuildEnvironment,
+) -> None:
+ """In Sphinx parallel builds, this merges audit_events from subprocesses."""
+ for name, args, source in other.audit_events:
+ env.audit_events.add_event(name, args, source)
+
+
+class AuditEvent(SphinxDirective):
+ has_content = True
+ required_arguments = 1
+ optional_arguments = 2
+ final_argument_whitespace = True
+
+ _label = [
+ sphinx_gettext(
+ "Raises an :ref:`auditing event <auditing>` "
+ "{name} with no arguments."
+ ),
+ sphinx_gettext(
+ "Raises an :ref:`auditing event <auditing>` "
+ "{name} with argument {args}."
+ ),
+ sphinx_gettext(
+ "Raises an :ref:`auditing event <auditing>` "
+ "{name} with arguments {args}."
+ ),
+ ]
+
+ def run(self) -> list[nodes.paragraph]:
+ name = self.arguments[0]
+ if len(self.arguments) >= 2 and self.arguments[1]:
+ args = [
+ arg
+ for argument in self.arguments[1].strip("'\"").split(",")
+ if (arg := argument.strip())
+ ]
+ else:
+ args = []
+ ids = []
+ try:
+ target = self.arguments[2].strip("\"'")
+ except (IndexError, TypeError):
+ target = None
+ if not target:
+ target = self.env.audit_events.id_for(name)
+ ids.append(target)
+ self.env.audit_events.add_event(name, args, (self.env.docname, target))
+
+ node = nodes.paragraph("", classes=["audit-hook"], ids=ids)
+ self.set_source_info(node)
+ if self.content:
+ self.state.nested_parse(self.content, self.content_offset, node)
+ else:
+ num_args = min(2, len(args))
+ text = self._label[num_args].format(
+ name=f"``{name}``",
+ args=", ".join(f"``{a}``" for a in args),
+ )
+ parsed, messages = self.state.inline_text(text, self.lineno)
+ node += parsed
+ node += messages
+ return [node]
+
+
+class audit_event_list(nodes.General, nodes.Element): # noqa: N801
+ pass
+
+
+class AuditEventListDirective(SphinxDirective):
+ def run(self) -> list[audit_event_list]:
+ return [audit_event_list()]
+
+
+class AuditEventListTransform(SphinxPostTransform):
+ default_priority = 500
+
+ def run(self) -> None:
+ if self.document.next_node(audit_event_list) is None:
+ return
+
+ table = self._make_table(self.app.builder, self.env.docname)
+ for node in self.document.findall(audit_event_list):
+ node.replace_self(table)
+
+ def _make_table(self, builder: Builder, docname: str) -> nodes.table:
+ table = nodes.table(cols=3)
+ group = nodes.tgroup(
+ "",
+ nodes.colspec(colwidth=30),
+ nodes.colspec(colwidth=55),
+ nodes.colspec(colwidth=15),
+ cols=3,
+ )
+ head = nodes.thead()
+ body = nodes.tbody()
+
+ table += group
+ group += head
+ group += body
+
+ head += nodes.row(
+ "",
+ nodes.entry("", nodes.paragraph("", "Audit event")),
+ nodes.entry("", nodes.paragraph("", "Arguments")),
+ nodes.entry("", nodes.paragraph("", "References")),
+ )
+
+ for name, args, sources in builder.env.audit_events.rows():
+ body += self._make_row(builder, docname, name, args, sources)
+
+ return table
+
+ @staticmethod
+ def _make_row(
+ builder: Builder,
+ docname: str,
+ name: str,
+ args: list[str],
+ sources: list[tuple[str, str]],
+ ) -> nodes.row:
+ row = nodes.row()
+ name_node = nodes.paragraph("", nodes.Text(name))
+ row += nodes.entry("", name_node)
+
+ args_node = nodes.paragraph()
+ for arg in args:
+ args_node += nodes.literal(arg, arg)
+ args_node += nodes.Text(", ")
+ if len(args_node.children) > 0:
+ args_node.children.pop() # remove trailing comma
+ row += nodes.entry("", args_node)
+
+ backlinks_node = nodes.paragraph()
+ backlinks = enumerate(sorted(set(sources)), start=1)
+ for i, (doc, label) in backlinks:
+ if isinstance(label, str):
+ ref = nodes.reference("", f"[{i}]", internal=True)
+ try:
+ target = (
+ f"{builder.get_relative_uri(docname, doc)}#{label}"
+ )
+ except NoUri:
+ continue
+ else:
+ ref["refuri"] = target
+ backlinks_node += ref
+ row += nodes.entry("", backlinks_node)
+ return row
+
+
+def setup(app: Sphinx):
+ app.add_directive("audit-event", AuditEvent)
+ app.add_directive("audit-event-table", AuditEventListDirective)
+ app.add_post_transform(AuditEventListTransform)
+ app.connect("builder-inited", initialise_audit_events)
+ app.connect("env-purge-doc", audit_events_purge)
+ app.connect("env-merge-info", audit_events_merge)
+ return {
+ "version": "1.0",
+ "parallel_read_safe": True,
+ "parallel_write_safe": True,
+ }
from time import asctime
from pprint import pformat
-import sphinx
from docutils import nodes
from docutils.io import StringOutput
from docutils.parsers.rst import directives
from sphinx.builders import Builder
from sphinx.domains.changeset import VersionChange, versionlabels, versionlabel_classes
from sphinx.domains.python import PyFunction, PyMethod, PyModule
-from sphinx.errors import NoUri
from sphinx.locale import _ as sphinx_gettext
from sphinx.util import logging
from sphinx.util.docutils import SphinxDirective
return platforms
-# Support for documenting audit event
-
-def audit_events_purge(app, env, docname):
- """This is to remove from env.all_audit_events old traces of removed
- documents.
- """
- if not hasattr(env, 'all_audit_events'):
- return
- fresh_all_audit_events = {}
- for name, event in env.all_audit_events.items():
- event["source"] = [(d, t) for d, t in event["source"] if d != docname]
- if event["source"]:
- # Only keep audit_events that have at least one source.
- fresh_all_audit_events[name] = event
- env.all_audit_events = fresh_all_audit_events
-
-
-def audit_events_merge(app, env, docnames, other):
- """In Sphinx parallel builds, this merges env.all_audit_events from
- subprocesses.
-
- all_audit_events is a dict of names, with values like:
- {'source': [(docname, target), ...], 'args': args}
- """
- if not hasattr(other, 'all_audit_events'):
- return
- if not hasattr(env, 'all_audit_events'):
- env.all_audit_events = {}
- for name, value in other.all_audit_events.items():
- if name in env.all_audit_events:
- env.all_audit_events[name]["source"].extend(value["source"])
- else:
- env.all_audit_events[name] = value
-
-
-class AuditEvent(SphinxDirective):
-
- has_content = True
- required_arguments = 1
- optional_arguments = 2
- final_argument_whitespace = True
-
- _label = [
- sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with no arguments."),
- sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with argument {args}."),
- sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with arguments {args}."),
- ]
-
- @property
- def logger(self):
- cls = type(self)
- return logging.getLogger(cls.__module__ + "." + cls.__name__)
-
- def run(self):
- name = self.arguments[0]
- if len(self.arguments) >= 2 and self.arguments[1]:
- args = (a.strip() for a in self.arguments[1].strip("'\"").split(","))
- args = [a for a in args if a]
- else:
- args = []
-
- label = self._label[min(2, len(args))]
- text = label.format(name="``{}``".format(name),
- args=", ".join("``{}``".format(a) for a in args if a))
-
- if not hasattr(self.env, 'all_audit_events'):
- self.env.all_audit_events = {}
-
- new_info = {
- 'source': [],
- 'args': args
- }
- info = self.env.all_audit_events.setdefault(name, new_info)
- if info is not new_info:
- if not self._do_args_match(info['args'], new_info['args']):
- self.logger.warning(
- "Mismatched arguments for audit-event {}: {!r} != {!r}"
- .format(name, info['args'], new_info['args'])
- )
-
- ids = []
- try:
- target = self.arguments[2].strip("\"'")
- except (IndexError, TypeError):
- target = None
- if not target:
- target = "audit_event_{}_{}".format(
- re.sub(r'\W', '_', name),
- len(info['source']),
- )
- ids.append(target)
-
- info['source'].append((self.env.docname, target))
-
- pnode = nodes.paragraph(text, classes=["audit-hook"], ids=ids)
- pnode.line = self.lineno
- if self.content:
- self.state.nested_parse(self.content, self.content_offset, pnode)
- else:
- n, m = self.state.inline_text(text, self.lineno)
- pnode.extend(n + m)
-
- return [pnode]
-
- # This list of sets are allowable synonyms for event argument names.
- # If two names are in the same set, they are treated as equal for the
- # purposes of warning. This won't help if number of arguments is
- # different!
- _SYNONYMS = [
- {"file", "path", "fd"},
- ]
-
- def _do_args_match(self, args1, args2):
- if args1 == args2:
- return True
- if len(args1) != len(args2):
- return False
- for a1, a2 in zip(args1, args2):
- if a1 == a2:
- continue
- if any(a1 in s and a2 in s for s in self._SYNONYMS):
- continue
- return False
- return True
-
-
-class audit_event_list(nodes.General, nodes.Element):
- pass
-
-
-class AuditEventListDirective(SphinxDirective):
-
- def run(self):
- return [audit_event_list('')]
-
-
# Support for documenting decorators
class PyDecoratorMixin(object):
return sig
-def process_audit_events(app, doctree, fromdocname):
- for node in doctree.findall(audit_event_list):
- break
- else:
- return
-
- env = app.builder.env
-
- table = nodes.table(cols=3)
- group = nodes.tgroup(
- '',
- nodes.colspec(colwidth=30),
- nodes.colspec(colwidth=55),
- nodes.colspec(colwidth=15),
- cols=3,
- )
- head = nodes.thead()
- body = nodes.tbody()
-
- table += group
- group += head
- group += body
-
- row = nodes.row()
- row += nodes.entry('', nodes.paragraph('', nodes.Text('Audit event')))
- row += nodes.entry('', nodes.paragraph('', nodes.Text('Arguments')))
- row += nodes.entry('', nodes.paragraph('', nodes.Text('References')))
- head += row
-
- for name in sorted(getattr(env, "all_audit_events", ())):
- audit_event = env.all_audit_events[name]
-
- row = nodes.row()
- node = nodes.paragraph('', nodes.Text(name))
- row += nodes.entry('', node)
-
- node = nodes.paragraph()
- for i, a in enumerate(audit_event['args']):
- if i:
- node += nodes.Text(", ")
- node += nodes.literal(a, nodes.Text(a))
- row += nodes.entry('', node)
-
- node = nodes.paragraph()
- backlinks = enumerate(sorted(set(audit_event['source'])), start=1)
- for i, (doc, label) in backlinks:
- if isinstance(label, str):
- ref = nodes.reference("", nodes.Text("[{}]".format(i)), internal=True)
- try:
- ref['refuri'] = "{}#{}".format(
- app.builder.get_relative_uri(fromdocname, doc),
- label,
- )
- except NoUri:
- continue
- node += ref
- row += nodes.entry('', node)
-
- body += row
-
- for node in doctree.findall(audit_event_list):
- node.replace_self(table)
-
-
def patch_pairindextypes(app, _env) -> None:
"""Remove all entries from ``pairindextypes`` before writing POT files.
app.add_role('gh', gh_issue_role)
app.add_directive('impl-detail', ImplementationDetail)
app.add_directive('availability', Availability)
- app.add_directive('audit-event', AuditEvent)
- app.add_directive('audit-event-table', AuditEventListDirective)
app.add_directive('deprecated-removed', DeprecatedRemoved)
app.add_builder(PydocTopicsBuilder)
app.add_object_type('opcode', 'opcode', '%s (opcode)', parse_opcode_signature)
app.add_directive_to_domain('py', 'abstractmethod', PyAbstractMethod)
app.add_directive('miscnews', MiscNews)
app.connect('env-check-consistency', patch_pairindextypes)
- app.connect('doctree-resolved', process_audit_events)
- app.connect('env-merge-info', audit_events_merge)
- app.connect('env-purge-doc', audit_events_purge)
return {'version': '1.0', 'parallel_read_safe': True}