]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Chore: refactor workflows code (#11563)
authorshamoon <4887959+shamoon@users.noreply.github.com>
Thu, 11 Dec 2025 20:13:10 +0000 (12:13 -0800)
committerGitHub <noreply@github.com>
Thu, 11 Dec 2025 20:13:10 +0000 (12:13 -0800)
src/documents/signals/handlers.py
src/documents/tasks.py
src/documents/tests/test_workflows.py
src/documents/workflows/__init__.py [new file with mode: 0644]
src/documents/workflows/actions.py [new file with mode: 0644]
src/documents/workflows/mutations.py [new file with mode: 0644]
src/documents/workflows/utils.py [new file with mode: 0644]
src/documents/workflows/webhooks.py [new file with mode: 0644]

index e410b54e200e61e8b105952b1df4fe3941386342..6cafbc1f8a92eab080dd9fb56873b3d0c5b8a11e 100644 (file)
@@ -1,14 +1,10 @@
 from __future__ import annotations
 
-import ipaddress
 import logging
 import shutil
-import socket
 from pathlib import Path
 from typing import TYPE_CHECKING
-from urllib.parse import urlparse
 
-import httpx
 from celery import shared_task
 from celery import states
 from celery.signals import before_task_publish
@@ -27,20 +23,15 @@ from django.db.models import Q
 from django.dispatch import receiver
 from django.utils import timezone
 from filelock import FileLock
-from guardian.shortcuts import remove_perm
 
 from documents import matching
 from documents.caching import clear_document_caches
 from documents.file_handling import create_source_path_directory
 from documents.file_handling import delete_empty_directories
 from documents.file_handling import generate_unique_filename
-from documents.mail import EmailAttachment
-from documents.mail import send_email
-from documents.models import Correspondent
 from documents.models import CustomField
 from documents.models import CustomFieldInstance
 from documents.models import Document
-from documents.models import DocumentType
 from documents.models import MatchingModel
 from documents.models import PaperlessTask
 from documents.models import SavedView
@@ -51,8 +42,14 @@ from documents.models import WorkflowAction
 from documents.models import WorkflowRun
 from documents.models import WorkflowTrigger
 from documents.permissions import get_objects_for_user_owner_aware
-from documents.permissions import set_permissions_for_object
-from documents.templating.workflows import parse_w_workflow_placeholders
+from documents.workflows.actions import build_workflow_action_context
+from documents.workflows.actions import execute_email_action
+from documents.workflows.actions import execute_webhook_action
+from documents.workflows.mutations import apply_assignment_to_document
+from documents.workflows.mutations import apply_assignment_to_overrides
+from documents.workflows.mutations import apply_removal_to_document
+from documents.workflows.mutations import apply_removal_to_overrides
+from documents.workflows.utils import get_workflows_for_trigger
 
 if TYPE_CHECKING:
     from documents.classifier import DocumentClassifier
@@ -673,92 +670,6 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar
     )
 
 
-def _is_public_ip(ip: str) -> bool:
-    try:
-        obj = ipaddress.ip_address(ip)
-        return not (
-            obj.is_private
-            or obj.is_loopback
-            or obj.is_link_local
-            or obj.is_multicast
-            or obj.is_unspecified
-        )
-    except ValueError:  # pragma: no cover
-        return False
-
-
-def _resolve_first_ip(host: str) -> str | None:
-    try:
-        info = socket.getaddrinfo(host, None)
-        return info[0][4][0] if info else None
-    except Exception:  # pragma: no cover
-        return None
-
-
-@shared_task(
-    retry_backoff=True,
-    autoretry_for=(httpx.HTTPStatusError,),
-    max_retries=3,
-    throws=(httpx.HTTPError,),
-)
-def send_webhook(
-    url: str,
-    data: str | dict,
-    headers: dict,
-    files: dict,
-    *,
-    as_json: bool = False,
-):
-    p = urlparse(url)
-    if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
-        logger.warning("Webhook blocked: invalid scheme/hostname")
-        raise ValueError("Invalid URL scheme or hostname.")
-
-    port = p.port or (443 if p.scheme == "https" else 80)
-    if (
-        len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
-        and port not in settings.WEBHOOKS_ALLOWED_PORTS
-    ):
-        logger.warning("Webhook blocked: port not permitted")
-        raise ValueError("Destination port not permitted.")
-
-    ip = _resolve_first_ip(p.hostname)
-    if not ip or (
-        not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
-    ):
-        logger.warning("Webhook blocked: destination not allowed")
-        raise ValueError("Destination host is not allowed.")
-
-    try:
-        post_args = {
-            "url": url,
-            "headers": {
-                k: v for k, v in (headers or {}).items() if k.lower() != "host"
-            },
-            "files": files or None,
-            "timeout": 5.0,
-            "follow_redirects": False,
-        }
-        if as_json:
-            post_args["json"] = data
-        elif isinstance(data, dict):
-            post_args["data"] = data
-        else:
-            post_args["content"] = data
-
-        httpx.post(
-            **post_args,
-        ).raise_for_status()
-        logger.info(
-            f"Webhook sent to {url}",
-        )
-    except Exception as e:
-        logger.error(
-            f"Failed attempt sending webhook to {url}: {e}",
-        )
-        raise e
-
-
 def run_workflows(
     trigger_type: WorkflowTrigger.WorkflowTriggerType,
     document: Document | ConsumableDocument,
@@ -767,572 +678,16 @@ def run_workflows(
     overrides: DocumentMetadataOverrides | None = None,
     original_file: Path | None = None,
 ) -> tuple[DocumentMetadataOverrides, str] | None:
-    """Run workflows which match a Document (or ConsumableDocument) for a specific trigger type or a single workflow if given.
-
-    Assignment or removal actions are either applied directly to the document or an overrides object. If an overrides
-    object is provided, the function returns the object with the applied changes or None if no actions were applied and a string
-    of messages for each action. If no overrides object is provided, the changes are applied directly to the document and the
-    function returns None.
     """
+    Execute workflows matching a document for the given trigger. When `overrides` is provided
+    (consumption flow), actions mutate that object and the function returns `(overrides, messages)`.
+    Otherwise actions mutate the actual document and return nothing.
 
-    def assignment_action():
-        if action.assign_tags.exists():
-            tag_ids_to_add: set[int] = set()
-            for tag in action.assign_tags.all():
-                tag_ids_to_add.add(tag.pk)
-                tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
-
-            if not use_overrides:
-                doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add)
-            else:
-                if overrides.tag_ids is None:
-                    overrides.tag_ids = []
-                overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add)
-
-        if action.assign_correspondent:
-            if not use_overrides:
-                document.correspondent = action.assign_correspondent
-            else:
-                overrides.correspondent_id = action.assign_correspondent.pk
-
-        if action.assign_document_type:
-            if not use_overrides:
-                document.document_type = action.assign_document_type
-            else:
-                overrides.document_type_id = action.assign_document_type.pk
-
-        if action.assign_storage_path:
-            if not use_overrides:
-                document.storage_path = action.assign_storage_path
-            else:
-                overrides.storage_path_id = action.assign_storage_path.pk
-
-        if action.assign_owner:
-            if not use_overrides:
-                document.owner = action.assign_owner
-            else:
-                overrides.owner_id = action.assign_owner.pk
-
-        if action.assign_title:
-            if not use_overrides:
-                try:
-                    document.title = parse_w_workflow_placeholders(
-                        action.assign_title,
-                        document.correspondent.name if document.correspondent else "",
-                        document.document_type.name if document.document_type else "",
-                        document.owner.username if document.owner else "",
-                        timezone.localtime(document.added),
-                        document.original_filename or "",
-                        document.filename or "",
-                        document.created,
-                    )
-                except Exception:
-                    logger.exception(
-                        f"Error occurred parsing title assignment '{action.assign_title}', falling back to original",
-                        extra={"group": logging_group},
-                    )
-            else:
-                overrides.title = action.assign_title
-
-        if any(
-            [
-                action.assign_view_users.exists(),
-                action.assign_view_groups.exists(),
-                action.assign_change_users.exists(),
-                action.assign_change_groups.exists(),
-            ],
-        ):
-            permissions = {
-                "view": {
-                    "users": action.assign_view_users.values_list("id", flat=True),
-                    "groups": action.assign_view_groups.values_list("id", flat=True),
-                },
-                "change": {
-                    "users": action.assign_change_users.values_list("id", flat=True),
-                    "groups": action.assign_change_groups.values_list("id", flat=True),
-                },
-            }
-            if not use_overrides:
-                set_permissions_for_object(
-                    permissions=permissions,
-                    object=document,
-                    merge=True,
-                )
-            else:
-                overrides.view_users = list(
-                    set(
-                        (overrides.view_users or [])
-                        + list(permissions["view"]["users"]),
-                    ),
-                )
-                overrides.view_groups = list(
-                    set(
-                        (overrides.view_groups or [])
-                        + list(permissions["view"]["groups"]),
-                    ),
-                )
-                overrides.change_users = list(
-                    set(
-                        (overrides.change_users or [])
-                        + list(permissions["change"]["users"]),
-                    ),
-                )
-                overrides.change_groups = list(
-                    set(
-                        (overrides.change_groups or [])
-                        + list(permissions["change"]["groups"]),
-                    ),
-                )
+    Attachments for email/webhook actions use `original_file` when given, otherwise fall back to
+    `document.source_path` (Document) or `document.original_file` (ConsumableDocument).
 
-        if action.assign_custom_fields.exists():
-            if not use_overrides:
-                for field in action.assign_custom_fields.all():
-                    value_field_name = CustomFieldInstance.get_value_field_name(
-                        data_type=field.data_type,
-                    )
-                    args = {
-                        value_field_name: action.assign_custom_fields_values.get(
-                            str(field.pk),
-                            None,
-                        ),
-                    }
-                    # for some reason update_or_create doesn't work here
-                    instance = CustomFieldInstance.objects.filter(
-                        field=field,
-                        document=document,
-                    ).first()
-                    if instance and args[value_field_name] is not None:
-                        setattr(instance, value_field_name, args[value_field_name])
-                        instance.save()
-                    elif not instance:
-                        CustomFieldInstance.objects.create(
-                            **args,
-                            field=field,
-                            document=document,
-                        )
-            else:
-                if overrides.custom_fields is None:
-                    overrides.custom_fields = {}
-                overrides.custom_fields.update(
-                    {
-                        field.pk: action.assign_custom_fields_values.get(
-                            str(field.pk),
-                            None,
-                        )
-                        for field in action.assign_custom_fields.all()
-                    },
-                )
-
-    def removal_action():
-        if action.remove_all_tags:
-            if not use_overrides:
-                doc_tag_ids.clear()
-            else:
-                overrides.tag_ids = None
-        else:
-            tag_ids_to_remove: set[int] = set()
-            for tag in action.remove_tags.all():
-                tag_ids_to_remove.add(tag.pk)
-                tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
-
-            if not use_overrides:
-                doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove]
-            elif overrides.tag_ids:
-                overrides.tag_ids = [
-                    t for t in overrides.tag_ids if t not in tag_ids_to_remove
-                ]
-
-        if not use_overrides and (
-            action.remove_all_correspondents
-            or (
-                document.correspondent
-                and action.remove_correspondents.filter(
-                    pk=document.correspondent.pk,
-                ).exists()
-            )
-        ):
-            document.correspondent = None
-        elif use_overrides and (
-            action.remove_all_correspondents
-            or (
-                overrides.correspondent_id
-                and action.remove_correspondents.filter(
-                    pk=overrides.correspondent_id,
-                ).exists()
-            )
-        ):
-            overrides.correspondent_id = None
-
-        if not use_overrides and (
-            action.remove_all_document_types
-            or (
-                document.document_type
-                and action.remove_document_types.filter(
-                    pk=document.document_type.pk,
-                ).exists()
-            )
-        ):
-            document.document_type = None
-        elif use_overrides and (
-            action.remove_all_document_types
-            or (
-                overrides.document_type_id
-                and action.remove_document_types.filter(
-                    pk=overrides.document_type_id,
-                ).exists()
-            )
-        ):
-            overrides.document_type_id = None
-
-        if not use_overrides and (
-            action.remove_all_storage_paths
-            or (
-                document.storage_path
-                and action.remove_storage_paths.filter(
-                    pk=document.storage_path.pk,
-                ).exists()
-            )
-        ):
-            document.storage_path = None
-        elif use_overrides and (
-            action.remove_all_storage_paths
-            or (
-                overrides.storage_path_id
-                and action.remove_storage_paths.filter(
-                    pk=overrides.storage_path_id,
-                ).exists()
-            )
-        ):
-            overrides.storage_path_id = None
-
-        if not use_overrides and (
-            action.remove_all_owners
-            or (
-                document.owner
-                and action.remove_owners.filter(pk=document.owner.pk).exists()
-            )
-        ):
-            document.owner = None
-        elif use_overrides and (
-            action.remove_all_owners
-            or (
-                overrides.owner_id
-                and action.remove_owners.filter(pk=overrides.owner_id).exists()
-            )
-        ):
-            overrides.owner_id = None
-
-        if action.remove_all_permissions:
-            if not use_overrides:
-                permissions = {
-                    "view": {"users": [], "groups": []},
-                    "change": {"users": [], "groups": []},
-                }
-                set_permissions_for_object(
-                    permissions=permissions,
-                    object=document,
-                    merge=False,
-                )
-            else:
-                overrides.view_users = None
-                overrides.view_groups = None
-                overrides.change_users = None
-                overrides.change_groups = None
-        elif any(
-            [
-                action.remove_view_users.exists(),
-                action.remove_view_groups.exists(),
-                action.remove_change_users.exists(),
-                action.remove_change_groups.exists(),
-            ],
-        ):
-            if not use_overrides:
-                for user in action.remove_view_users.all():
-                    remove_perm("view_document", user, document)
-                for user in action.remove_change_users.all():
-                    remove_perm("change_document", user, document)
-                for group in action.remove_view_groups.all():
-                    remove_perm("view_document", group, document)
-                for group in action.remove_change_groups.all():
-                    remove_perm("change_document", group, document)
-            else:
-                if overrides.view_users:
-                    for user in action.remove_view_users.filter(
-                        pk__in=overrides.view_users,
-                    ):
-                        overrides.view_users.remove(user.pk)
-                if overrides.change_users:
-                    for user in action.remove_change_users.filter(
-                        pk__in=overrides.change_users,
-                    ):
-                        overrides.change_users.remove(user.pk)
-                if overrides.view_groups:
-                    for group in action.remove_view_groups.filter(
-                        pk__in=overrides.view_groups,
-                    ):
-                        overrides.view_groups.remove(group.pk)
-                if overrides.change_groups:
-                    for group in action.remove_change_groups.filter(
-                        pk__in=overrides.change_groups,
-                    ):
-                        overrides.change_groups.remove(group.pk)
-
-        if action.remove_all_custom_fields:
-            if not use_overrides:
-                CustomFieldInstance.objects.filter(document=document).hard_delete()
-            else:
-                overrides.custom_fields = None
-        elif action.remove_custom_fields.exists():
-            if not use_overrides:
-                CustomFieldInstance.objects.filter(
-                    field__in=action.remove_custom_fields.all(),
-                    document=document,
-                ).hard_delete()
-            elif overrides.custom_fields:
-                for field in action.remove_custom_fields.filter(
-                    pk__in=overrides.custom_fields.keys(),
-                ):
-                    overrides.custom_fields.pop(field.pk, None)
-
-    def email_action():
-        if not settings.EMAIL_ENABLED:
-            logger.error(
-                "Email backend has not been configured, cannot send email notifications",
-                extra={"group": logging_group},
-            )
-            return
-
-        if not use_overrides:
-            title = document.title
-            doc_url = (
-                f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
-            )
-            correspondent = (
-                document.correspondent.name if document.correspondent else ""
-            )
-            document_type = (
-                document.document_type.name if document.document_type else ""
-            )
-            owner_username = document.owner.username if document.owner else ""
-            filename = document.original_filename or ""
-            current_filename = document.filename or ""
-            added = timezone.localtime(document.added)
-            created = document.created
-        else:
-            title = overrides.title if overrides.title else str(document.original_file)
-            doc_url = ""
-            correspondent = (
-                Correspondent.objects.filter(pk=overrides.correspondent_id).first()
-                if overrides.correspondent_id
-                else ""
-            )
-            document_type = (
-                DocumentType.objects.filter(pk=overrides.document_type_id).first().name
-                if overrides.document_type_id
-                else ""
-            )
-            owner_username = (
-                User.objects.filter(pk=overrides.owner_id).first().username
-                if overrides.owner_id
-                else ""
-            )
-            filename = document.original_file if document.original_file else ""
-            current_filename = filename
-            added = timezone.localtime(timezone.now())
-            created = overrides.created
-
-        subject = (
-            parse_w_workflow_placeholders(
-                action.email.subject,
-                correspondent,
-                document_type,
-                owner_username,
-                added,
-                filename,
-                current_filename,
-                created,
-                title,
-                doc_url,
-            )
-            if action.email.subject
-            else ""
-        )
-        body = (
-            parse_w_workflow_placeholders(
-                action.email.body,
-                correspondent,
-                document_type,
-                owner_username,
-                added,
-                filename,
-                current_filename,
-                created,
-                title,
-                doc_url,
-            )
-            if action.email.body
-            else ""
-        )
-        try:
-            attachments: list[EmailAttachment] = []
-            if action.email.include_document:
-                attachment: EmailAttachment | None = None
-                if trigger_type in [
-                    WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
-                    WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
-                ] and isinstance(document, Document):
-                    friendly_name = (
-                        Path(current_filename).name
-                        if current_filename
-                        else document.source_path.name
-                    )
-                    attachment = EmailAttachment(
-                        path=document.source_path,
-                        mime_type=document.mime_type,
-                        friendly_name=friendly_name,
-                    )
-                elif original_file:
-                    friendly_name = (
-                        Path(current_filename).name
-                        if current_filename
-                        else original_file.name
-                    )
-                    attachment = EmailAttachment(
-                        path=original_file,
-                        mime_type=document.mime_type,
-                        friendly_name=friendly_name,
-                    )
-                if attachment:
-                    attachments = [attachment]
-            n_messages = send_email(
-                subject=subject,
-                body=body,
-                to=action.email.to.split(","),
-                attachments=attachments,
-            )
-            logger.debug(
-                f"Sent {n_messages} notification email(s) to {action.email.to}",
-                extra={"group": logging_group},
-            )
-        except Exception as e:
-            logger.exception(
-                f"Error occurred sending notification email: {e}",
-                extra={"group": logging_group},
-            )
-
-    def webhook_action():
-        if not use_overrides:
-            title = document.title
-            doc_url = (
-                f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
-            )
-            correspondent = (
-                document.correspondent.name if document.correspondent else ""
-            )
-            document_type = (
-                document.document_type.name if document.document_type else ""
-            )
-            owner_username = document.owner.username if document.owner else ""
-            filename = document.original_filename or ""
-            current_filename = document.filename or ""
-            added = timezone.localtime(document.added)
-            created = document.created
-        else:
-            title = overrides.title if overrides.title else str(document.original_file)
-            doc_url = ""
-            correspondent = (
-                Correspondent.objects.filter(pk=overrides.correspondent_id).first()
-                if overrides.correspondent_id
-                else ""
-            )
-            document_type = (
-                DocumentType.objects.filter(pk=overrides.document_type_id).first().name
-                if overrides.document_type_id
-                else ""
-            )
-            owner_username = (
-                User.objects.filter(pk=overrides.owner_id).first().username
-                if overrides.owner_id
-                else ""
-            )
-            filename = document.original_file if document.original_file else ""
-            current_filename = filename
-            added = timezone.localtime(timezone.now())
-            created = overrides.created
-
-        try:
-            data = {}
-            if action.webhook.use_params:
-                if action.webhook.params:
-                    try:
-                        for key, value in action.webhook.params.items():
-                            data[key] = parse_w_workflow_placeholders(
-                                value,
-                                correspondent,
-                                document_type,
-                                owner_username,
-                                added,
-                                filename,
-                                current_filename,
-                                created,
-                                title,
-                                doc_url,
-                            )
-                    except Exception as e:
-                        logger.error(
-                            f"Error occurred parsing webhook params: {e}",
-                            extra={"group": logging_group},
-                        )
-            elif action.webhook.body:
-                data = parse_w_workflow_placeholders(
-                    action.webhook.body,
-                    correspondent,
-                    document_type,
-                    owner_username,
-                    added,
-                    filename,
-                    current_filename,
-                    created,
-                    title,
-                    doc_url,
-                )
-            headers = {}
-            if action.webhook.headers:
-                try:
-                    headers = {
-                        str(k): str(v) for k, v in action.webhook.headers.items()
-                    }
-                except Exception as e:
-                    logger.error(
-                        f"Error occurred parsing webhook headers: {e}",
-                        extra={"group": logging_group},
-                    )
-            files = None
-            if action.webhook.include_document:
-                with original_file.open("rb") as f:
-                    files = {
-                        "file": (
-                            filename,
-                            f.read(),
-                            document.mime_type,
-                        ),
-                    }
-            send_webhook.delay(
-                url=action.webhook.url,
-                data=data,
-                headers=headers,
-                files=files,
-                as_json=action.webhook.as_json,
-            )
-            logger.debug(
-                f"Webhook to {action.webhook.url} queued",
-                extra={"group": logging_group},
-            )
-        except Exception as e:
-            logger.exception(
-                f"Error occurred sending webhook: {e}",
-                extra={"group": logging_group},
-            )
+    Passing `workflow_to_run` skips the workflow query (currently only used by scheduled runs).
+    """
 
     use_overrides = overrides is not None
     if original_file is None:
@@ -1341,30 +696,7 @@ def run_workflows(
         )
     messages = []
 
-    workflows = (
-        (
-            Workflow.objects.filter(enabled=True, triggers__type=trigger_type)
-            .prefetch_related(
-                "actions",
-                "actions__assign_view_users",
-                "actions__assign_view_groups",
-                "actions__assign_change_users",
-                "actions__assign_change_groups",
-                "actions__assign_custom_fields",
-                "actions__remove_tags",
-                "actions__remove_correspondents",
-                "actions__remove_document_types",
-                "actions__remove_storage_paths",
-                "actions__remove_custom_fields",
-                "actions__remove_owners",
-                "triggers",
-            )
-            .order_by("order")
-            .distinct()
-        )
-        if workflow_to_run is None
-        else [workflow_to_run]
-    )
+    workflows = get_workflows_for_trigger(trigger_type, workflow_to_run)
 
     for workflow in workflows:
         if not use_overrides:
@@ -1384,13 +716,39 @@ def run_workflows(
                     messages.append(message)
 
                 if action.type == WorkflowAction.WorkflowActionType.ASSIGNMENT:
-                    assignment_action()
+                    if use_overrides and overrides:
+                        apply_assignment_to_overrides(action, overrides)
+                    else:
+                        apply_assignment_to_document(
+                            action,
+                            document,
+                            doc_tag_ids,
+                            logging_group,
+                        )
                 elif action.type == WorkflowAction.WorkflowActionType.REMOVAL:
-                    removal_action()
+                    if use_overrides and overrides:
+                        apply_removal_to_overrides(action, overrides)
+                    else:
+                        apply_removal_to_document(action, document, doc_tag_ids)
                 elif action.type == WorkflowAction.WorkflowActionType.EMAIL:
-                    email_action()
+                    context = build_workflow_action_context(document, overrides)
+                    execute_email_action(
+                        action,
+                        document,
+                        context,
+                        logging_group,
+                        original_file,
+                        trigger_type,
+                    )
                 elif action.type == WorkflowAction.WorkflowActionType.WEBHOOK:
-                    webhook_action()
+                    context = build_workflow_action_context(document, overrides)
+                    execute_webhook_action(
+                        action,
+                        document,
+                        context,
+                        logging_group,
+                        original_file,
+                    )
 
             if not use_overrides:
                 # limit title to 128 characters
index 17bfce3b066c845b3df81037380ea460bdc0d145..606f278dbcd6df0687719aefd3c83bf45dcc8f82 100644 (file)
@@ -41,7 +41,6 @@ from documents.models import DocumentType
 from documents.models import PaperlessTask
 from documents.models import StoragePath
 from documents.models import Tag
-from documents.models import Workflow
 from documents.models import WorkflowRun
 from documents.models import WorkflowTrigger
 from documents.parsers import DocumentParser
@@ -54,6 +53,7 @@ from documents.sanity_checker import SanityCheckFailedException
 from documents.signals import document_updated
 from documents.signals.handlers import cleanup_document_deletion
 from documents.signals.handlers import run_workflows
+from documents.workflows.utils import get_workflows_for_trigger
 
 if settings.AUDIT_LOG_ENABLED:
     from auditlog.models import LogEntry
@@ -400,13 +400,8 @@ def check_scheduled_workflows():
 
     Once a document satisfies this condition, and recurring/non-recurring constraints are met, the workflow is run.
     """
-    scheduled_workflows: list[Workflow] = (
-        Workflow.objects.filter(
-            triggers__type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
-            enabled=True,
-        )
-        .distinct()
-        .prefetch_related("triggers")
+    scheduled_workflows = get_workflows_for_trigger(
+        WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
     )
     if scheduled_workflows.count() > 0:
         logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows")
index 6438e4b100ffe40c73b1c716e9040195a4ba9b2b..e606bc5a072a19439e3981f7fe63c2e6e1475f47 100644 (file)
@@ -26,7 +26,7 @@ from rest_framework.test import APITestCase
 from documents.file_handling import create_source_path_directory
 from documents.file_handling import generate_unique_filename
 from documents.signals.handlers import run_workflows
-from documents.signals.handlers import send_webhook
+from documents.workflows.webhooks import send_webhook
 
 if TYPE_CHECKING:
     from django.db.models import QuerySet
@@ -2858,7 +2858,7 @@ class TestWorkflows(
 
         mock_email_send.return_value = 1
 
-        with self.assertNoLogs("paperless.handlers", level="ERROR"):
+        with self.assertNoLogs("paperless.workflows", level="ERROR"):
             run_workflows(
                 WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
                 consumable_document,
@@ -3096,7 +3096,7 @@ class TestWorkflows(
             original_filename="sample.pdf",
         )
 
-        with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+        with self.assertLogs("paperless.workflows.actions", level="ERROR") as cm:
             run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
 
             expected_str = "Email backend has not been configured"
@@ -3144,7 +3144,7 @@ class TestWorkflows(
             original_filename="sample.pdf",
         )
 
-        with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+        with self.assertLogs("paperless.workflows", level="ERROR") as cm:
             run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
 
             expected_str = "Error occurred sending email"
@@ -3215,7 +3215,7 @@ class TestWorkflows(
         PAPERLESS_FORCE_SCRIPT_NAME="/paperless",
         BASE_URL="/paperless/",
     )
-    @mock.patch("documents.signals.handlers.send_webhook.delay")
+    @mock.patch("documents.workflows.webhooks.send_webhook.delay")
     def test_workflow_webhook_action_body(self, mock_post):
         """
         GIVEN:
@@ -3274,7 +3274,7 @@ class TestWorkflows(
     @override_settings(
         PAPERLESS_URL="http://localhost:8000",
     )
-    @mock.patch("documents.signals.handlers.send_webhook.delay")
+    @mock.patch("documents.workflows.webhooks.send_webhook.delay")
     def test_workflow_webhook_action_w_files(self, mock_post):
         """
         GIVEN:
@@ -3377,7 +3377,7 @@ class TestWorkflows(
         )
 
         # fails because no file
-        with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+        with self.assertLogs("paperless.workflows", level="ERROR") as cm:
             run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
 
             expected_str = "Error occurred sending webhook"
@@ -3420,7 +3420,7 @@ class TestWorkflows(
             original_filename="sample.pdf",
         )
 
-        with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+        with self.assertLogs("paperless.workflows", level="ERROR") as cm:
             run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
 
             expected_str = "Error occurred parsing webhook params"
@@ -3436,7 +3436,7 @@ class TestWorkflows(
             raise_for_status=mock.Mock(),
         )
 
-        with self.assertLogs("paperless.handlers") as cm:
+        with self.assertLogs("paperless.workflows") as cm:
             send_webhook(
                 url="http://paperless-ngx.com",
                 data="Test message",
@@ -3482,7 +3482,7 @@ class TestWorkflows(
             ),
         )
 
-        with self.assertLogs("paperless.handlers") as cm:
+        with self.assertLogs("paperless.workflows") as cm:
             with self.assertRaises(HTTPStatusError):
                 send_webhook(
                     url="http://paperless-ngx.com",
@@ -3498,7 +3498,7 @@ class TestWorkflows(
                 )
                 self.assertIn(expected_str, cm.output[0])
 
-    @mock.patch("documents.signals.handlers.send_webhook.delay")
+    @mock.patch("documents.workflows.webhooks.send_webhook.delay")
     def test_workflow_webhook_action_consumption(self, mock_post):
         """
         GIVEN:
diff --git a/src/documents/workflows/__init__.py b/src/documents/workflows/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/documents/workflows/actions.py b/src/documents/workflows/actions.py
new file mode 100644 (file)
index 0000000..040cbc1
--- /dev/null
@@ -0,0 +1,261 @@
+import logging
+from pathlib import Path
+
+from django.conf import settings
+from django.contrib.auth.models import User
+from django.utils import timezone
+
+from documents.data_models import ConsumableDocument
+from documents.data_models import DocumentMetadataOverrides
+from documents.mail import EmailAttachment
+from documents.mail import send_email
+from documents.models import Correspondent
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import WorkflowAction
+from documents.models import WorkflowTrigger
+from documents.templating.workflows import parse_w_workflow_placeholders
+from documents.workflows.webhooks import send_webhook
+
+logger = logging.getLogger("paperless.workflows.actions")
+
+
+def build_workflow_action_context(
+    document: Document | ConsumableDocument,
+    overrides: DocumentMetadataOverrides | None,
+) -> dict:
+    """
+    Build context dictionary for workflow action placeholder parsing.
+    """
+    use_overrides = overrides is not None
+
+    if not use_overrides:
+        return {
+            "title": document.title,
+            "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/",
+            "correspondent": document.correspondent.name
+            if document.correspondent
+            else "",
+            "document_type": document.document_type.name
+            if document.document_type
+            else "",
+            "owner_username": document.owner.username if document.owner else "",
+            "filename": document.original_filename or "",
+            "current_filename": document.filename or "",
+            "added": timezone.localtime(document.added),
+            "created": document.created,
+        }
+
+    correspondent_obj = (
+        Correspondent.objects.filter(pk=overrides.correspondent_id).first()
+        if overrides and overrides.correspondent_id
+        else None
+    )
+    document_type_obj = (
+        DocumentType.objects.filter(pk=overrides.document_type_id).first()
+        if overrides and overrides.document_type_id
+        else None
+    )
+    owner_obj = (
+        User.objects.filter(pk=overrides.owner_id).first()
+        if overrides and overrides.owner_id
+        else None
+    )
+
+    filename = document.original_file if document.original_file else ""
+    return {
+        "title": overrides.title
+        if overrides and overrides.title
+        else str(document.original_file),
+        "doc_url": "",
+        "correspondent": correspondent_obj.name if correspondent_obj else "",
+        "document_type": document_type_obj.name if document_type_obj else "",
+        "owner_username": owner_obj.username if owner_obj else "",
+        "filename": filename,
+        "current_filename": filename,
+        "added": timezone.localtime(timezone.now()),
+        "created": overrides.created if overrides else None,
+    }
+
+
+def execute_email_action(
+    action: WorkflowAction,
+    document: Document | ConsumableDocument,
+    context: dict,
+    logging_group,
+    original_file: Path,
+    trigger_type: WorkflowTrigger.WorkflowTriggerType,
+) -> None:
+    """
+    Execute an email action for a workflow.
+    """
+
+    if not settings.EMAIL_ENABLED:
+        logger.error(
+            "Email backend has not been configured, cannot send email notifications",
+            extra={"group": logging_group},
+        )
+        return
+
+    subject = (
+        parse_w_workflow_placeholders(
+            action.email.subject,
+            context["correspondent"],
+            context["document_type"],
+            context["owner_username"],
+            context["added"],
+            context["filename"],
+            context["current_filename"],
+            context["created"],
+            context["title"],
+            context["doc_url"],
+        )
+        if action.email.subject
+        else ""
+    )
+    body = (
+        parse_w_workflow_placeholders(
+            action.email.body,
+            context["correspondent"],
+            context["document_type"],
+            context["owner_username"],
+            context["added"],
+            context["filename"],
+            context["current_filename"],
+            context["created"],
+            context["title"],
+            context["doc_url"],
+        )
+        if action.email.body
+        else ""
+    )
+
+    try:
+        attachments: list[EmailAttachment] = []
+        if action.email.include_document:
+            attachment: EmailAttachment | None = None
+            if trigger_type in [
+                WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
+                WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
+            ] and isinstance(document, Document):
+                friendly_name = (
+                    Path(context["current_filename"]).name
+                    if context["current_filename"]
+                    else document.source_path.name
+                )
+                attachment = EmailAttachment(
+                    path=document.source_path,
+                    mime_type=document.mime_type,
+                    friendly_name=friendly_name,
+                )
+            elif original_file:
+                friendly_name = (
+                    Path(context["current_filename"]).name
+                    if context["current_filename"]
+                    else original_file.name
+                )
+                attachment = EmailAttachment(
+                    path=original_file,
+                    mime_type=document.mime_type,
+                    friendly_name=friendly_name,
+                )
+            if attachment:
+                attachments = [attachment]
+
+        n_messages = send_email(
+            subject=subject,
+            body=body,
+            to=action.email.to.split(","),
+            attachments=attachments,
+        )
+        logger.debug(
+            f"Sent {n_messages} notification email(s) to {action.email.to}",
+            extra={"group": logging_group},
+        )
+    except Exception as e:
+        logger.exception(
+            f"Error occurred sending notification email: {e}",
+            extra={"group": logging_group},
+        )
+
+
+def execute_webhook_action(
+    action: WorkflowAction,
+    document: Document | ConsumableDocument,
+    context: dict,
+    logging_group,
+    original_file: Path,
+):
+    try:
+        data = {}
+        if action.webhook.use_params:
+            if action.webhook.params:
+                try:
+                    for key, value in action.webhook.params.items():
+                        data[key] = parse_w_workflow_placeholders(
+                            value,
+                            context["correspondent"],
+                            context["document_type"],
+                            context["owner_username"],
+                            context["added"],
+                            context["filename"],
+                            context["current_filename"],
+                            context["created"],
+                            context["title"],
+                            context["doc_url"],
+                        )
+                except Exception as e:
+                    logger.error(
+                        f"Error occurred parsing webhook params: {e}",
+                        extra={"group": logging_group},
+                    )
+        elif action.webhook.body:
+            data = parse_w_workflow_placeholders(
+                action.webhook.body,
+                context["correspondent"],
+                context["document_type"],
+                context["owner_username"],
+                context["added"],
+                context["filename"],
+                context["current_filename"],
+                context["created"],
+                context["title"],
+                context["doc_url"],
+            )
+        headers = {}
+        if action.webhook.headers:
+            try:
+                headers = {str(k): str(v) for k, v in action.webhook.headers.items()}
+            except Exception as e:
+                logger.error(
+                    f"Error occurred parsing webhook headers: {e}",
+                    extra={"group": logging_group},
+                )
+        files = None
+        if action.webhook.include_document:
+            with original_file.open("rb") as f:
+                files = {
+                    "file": (
+                        str(context["filename"])
+                        if context["filename"]
+                        else original_file.name,
+                        f.read(),
+                        document.mime_type,
+                    ),
+                }
+        send_webhook.delay(
+            url=action.webhook.url,
+            data=data,
+            headers=headers,
+            files=files,
+            as_json=action.webhook.as_json,
+        )
+        logger.debug(
+            f"Webhook to {action.webhook.url} queued",
+            extra={"group": logging_group},
+        )
+    except Exception as e:
+        logger.exception(
+            f"Error occurred sending webhook: {e}",
+            extra={"group": logging_group},
+        )
diff --git a/src/documents/workflows/mutations.py b/src/documents/workflows/mutations.py
new file mode 100644 (file)
index 0000000..ef85dba
--- /dev/null
@@ -0,0 +1,357 @@
+import logging
+
+from django.utils import timezone
+from guardian.shortcuts import remove_perm
+
+from documents.data_models import DocumentMetadataOverrides
+from documents.models import CustomFieldInstance
+from documents.models import Document
+from documents.models import WorkflowAction
+from documents.permissions import set_permissions_for_object
+from documents.templating.workflows import parse_w_workflow_placeholders
+
+logger = logging.getLogger("paperless.workflows.mutations")
+
+
+def apply_assignment_to_document(
+    action: WorkflowAction,
+    document: Document,
+    doc_tag_ids: list[int],
+    logging_group,
+):
+    """
+    Apply assignment actions to a Document instance.
+
+    action: WorkflowAction, annotated with 'has_assign_*' boolean fields
+    """
+    if action.has_assign_tags:
+        tag_ids_to_add: set[int] = set()
+        for tag in action.assign_tags.all():
+            tag_ids_to_add.add(tag.pk)
+            tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
+
+        doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add)
+
+    if action.assign_correspondent:
+        document.correspondent = action.assign_correspondent
+
+    if action.assign_document_type:
+        document.document_type = action.assign_document_type
+
+    if action.assign_storage_path:
+        document.storage_path = action.assign_storage_path
+
+    if action.assign_owner:
+        document.owner = action.assign_owner
+
+    if action.assign_title:
+        try:
+            document.title = parse_w_workflow_placeholders(
+                action.assign_title,
+                document.correspondent.name if document.correspondent else "",
+                document.document_type.name if document.document_type else "",
+                document.owner.username if document.owner else "",
+                timezone.localtime(document.added),
+                document.original_filename or "",
+                document.filename or "",
+                document.created,
+            )
+        except Exception:  # pragma: no cover
+            logger.exception(
+                f"Error occurred parsing title assignment '{action.assign_title}', falling back to original",
+                extra={"group": logging_group},
+            )
+
+    if any(
+        [
+            action.has_assign_view_users,
+            action.has_assign_view_groups,
+            action.has_assign_change_users,
+            action.has_assign_change_groups,
+        ],
+    ):
+        permissions = {
+            "view": {
+                "users": action.assign_view_users.values_list("id", flat=True),
+                "groups": action.assign_view_groups.values_list("id", flat=True),
+            },
+            "change": {
+                "users": action.assign_change_users.values_list("id", flat=True),
+                "groups": action.assign_change_groups.values_list("id", flat=True),
+            },
+        }
+        set_permissions_for_object(
+            permissions=permissions,
+            object=document,
+            merge=True,
+        )
+
+    if action.has_assign_custom_fields:
+        for field in action.assign_custom_fields.all():
+            value_field_name = CustomFieldInstance.get_value_field_name(
+                data_type=field.data_type,
+            )
+            args = {
+                value_field_name: action.assign_custom_fields_values.get(
+                    str(field.pk),
+                    None,
+                ),
+            }
+            # for some reason update_or_create doesn't work here
+            instance = CustomFieldInstance.objects.filter(
+                field=field,
+                document=document,
+            ).first()
+            if instance and args[value_field_name] is not None:
+                setattr(instance, value_field_name, args[value_field_name])
+                instance.save()
+            elif not instance:
+                CustomFieldInstance.objects.create(
+                    **args,
+                    field=field,
+                    document=document,
+                )
+
+
+def apply_assignment_to_overrides(
+    action: WorkflowAction,
+    overrides: DocumentMetadataOverrides,
+):
+    """
+    Apply assignment actions to DocumentMetadataOverrides.
+
+    action: WorkflowAction, annotated with 'has_assign_*' boolean fields
+    """
+    if action.has_assign_tags:
+        if overrides.tag_ids is None:
+            overrides.tag_ids = []
+        tag_ids_to_add: set[int] = set()
+        for tag in action.assign_tags.all():
+            tag_ids_to_add.add(tag.pk)
+            tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
+
+        overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add)
+
+    if action.assign_correspondent:
+        overrides.correspondent_id = action.assign_correspondent.pk
+
+    if action.assign_document_type:
+        overrides.document_type_id = action.assign_document_type.pk
+
+    if action.assign_storage_path:
+        overrides.storage_path_id = action.assign_storage_path.pk
+
+    if action.assign_owner:
+        overrides.owner_id = action.assign_owner.pk
+
+    if action.assign_title:
+        overrides.title = action.assign_title
+
+    if any(
+        [
+            action.has_assign_view_users,
+            action.has_assign_view_groups,
+            action.has_assign_change_users,
+            action.has_assign_change_groups,
+        ],
+    ):
+        overrides.view_users = list(
+            set(
+                (overrides.view_users or [])
+                + list(action.assign_view_users.values_list("id", flat=True)),
+            ),
+        )
+        overrides.view_groups = list(
+            set(
+                (overrides.view_groups or [])
+                + list(action.assign_view_groups.values_list("id", flat=True)),
+            ),
+        )
+        overrides.change_users = list(
+            set(
+                (overrides.change_users or [])
+                + list(action.assign_change_users.values_list("id", flat=True)),
+            ),
+        )
+        overrides.change_groups = list(
+            set(
+                (overrides.change_groups or [])
+                + list(action.assign_change_groups.values_list("id", flat=True)),
+            ),
+        )
+
+    if action.has_assign_custom_fields:
+        if overrides.custom_fields is None:
+            overrides.custom_fields = {}
+        overrides.custom_fields.update(
+            {
+                field.pk: action.assign_custom_fields_values.get(
+                    str(field.pk),
+                    None,
+                )
+                for field in action.assign_custom_fields.all()
+            },
+        )
+
+
+def apply_removal_to_document(
+    action: WorkflowAction,
+    document: Document,
+    doc_tag_ids: list[int],
+):
+    """
+    Apply removal actions to a Document instance.
+
+    action: WorkflowAction, annotated with 'has_remove_*' boolean fields
+    """
+
+    if action.remove_all_tags:
+        doc_tag_ids.clear()
+    else:
+        tag_ids_to_remove: set[int] = set()
+        for tag in action.remove_tags.all():
+            tag_ids_to_remove.add(tag.pk)
+            tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
+
+        doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove]
+
+    if action.remove_all_correspondents or (
+        document.correspondent
+        and action.remove_correspondents.filter(pk=document.correspondent.pk).exists()
+    ):
+        document.correspondent = None
+
+    if action.remove_all_document_types or (
+        document.document_type
+        and action.remove_document_types.filter(pk=document.document_type.pk).exists()
+    ):
+        document.document_type = None
+
+    if action.remove_all_storage_paths or (
+        document.storage_path
+        and action.remove_storage_paths.filter(pk=document.storage_path.pk).exists()
+    ):
+        document.storage_path = None
+
+    if action.remove_all_owners or (
+        document.owner and action.remove_owners.filter(pk=document.owner.pk).exists()
+    ):
+        document.owner = None
+
+    if action.remove_all_permissions:
+        permissions = {
+            "view": {"users": [], "groups": []},
+            "change": {"users": [], "groups": []},
+        }
+        set_permissions_for_object(
+            permissions=permissions,
+            object=document,
+            merge=False,
+        )
+
+    if any(
+        [
+            action.has_remove_view_users,
+            action.has_remove_view_groups,
+            action.has_remove_change_users,
+            action.has_remove_change_groups,
+        ],
+    ):
+        for user in action.remove_view_users.all():
+            remove_perm("view_document", user, document)
+        for user in action.remove_change_users.all():
+            remove_perm("change_document", user, document)
+        for group in action.remove_view_groups.all():
+            remove_perm("view_document", group, document)
+        for group in action.remove_change_groups.all():
+            remove_perm("change_document", group, document)
+
+    if action.remove_all_custom_fields:
+        CustomFieldInstance.objects.filter(document=document).hard_delete()
+    elif action.has_remove_custom_fields:
+        CustomFieldInstance.objects.filter(
+            field__in=action.remove_custom_fields.all(),
+            document=document,
+        ).hard_delete()
+
+
+def apply_removal_to_overrides(
+    action: WorkflowAction,
+    overrides: DocumentMetadataOverrides,
+):
+    """
+    Apply removal actions to DocumentMetadataOverrides.
+
+    action: WorkflowAction, annotated with 'has_remove_*' boolean fields
+    """
+    if action.remove_all_tags:
+        overrides.tag_ids = None
+    elif overrides.tag_ids:
+        tag_ids_to_remove: set[int] = set()
+        for tag in action.remove_tags.all():
+            tag_ids_to_remove.add(tag.pk)
+            tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
+
+        overrides.tag_ids = [t for t in overrides.tag_ids if t not in tag_ids_to_remove]
+
+    if action.remove_all_correspondents or (
+        overrides.correspondent_id
+        and action.remove_correspondents.filter(pk=overrides.correspondent_id).exists()
+    ):
+        overrides.correspondent_id = None
+
+    if action.remove_all_document_types or (
+        overrides.document_type_id
+        and action.remove_document_types.filter(pk=overrides.document_type_id).exists()
+    ):
+        overrides.document_type_id = None
+
+    if action.remove_all_storage_paths or (
+        overrides.storage_path_id
+        and action.remove_storage_paths.filter(pk=overrides.storage_path_id).exists()
+    ):
+        overrides.storage_path_id = None
+
+    if action.remove_all_owners or (
+        overrides.owner_id
+        and action.remove_owners.filter(pk=overrides.owner_id).exists()
+    ):
+        overrides.owner_id = None
+
+    if action.remove_all_permissions:
+        overrides.view_users = None
+        overrides.view_groups = None
+        overrides.change_users = None
+        overrides.change_groups = None
+    elif any(
+        [
+            action.has_remove_view_users,
+            action.has_remove_view_groups,
+            action.has_remove_change_users,
+            action.has_remove_change_groups,
+        ],
+    ):
+        if overrides.view_users:
+            for user in action.remove_view_users.filter(pk__in=overrides.view_users):
+                overrides.view_users.remove(user.pk)
+        if overrides.change_users:
+            for user in action.remove_change_users.filter(
+                pk__in=overrides.change_users,
+            ):
+                overrides.change_users.remove(user.pk)
+        if overrides.view_groups:
+            for group in action.remove_view_groups.filter(pk__in=overrides.view_groups):
+                overrides.view_groups.remove(group.pk)
+        if overrides.change_groups:
+            for group in action.remove_change_groups.filter(
+                pk__in=overrides.change_groups,
+            ):
+                overrides.change_groups.remove(group.pk)
+
+    if action.remove_all_custom_fields:
+        overrides.custom_fields = None
+    elif action.has_remove_custom_fields and overrides.custom_fields:
+        for field in action.remove_custom_fields.filter(
+            pk__in=overrides.custom_fields.keys(),
+        ):
+            overrides.custom_fields.pop(field.pk, None)
diff --git a/src/documents/workflows/utils.py b/src/documents/workflows/utils.py
new file mode 100644 (file)
index 0000000..5536222
--- /dev/null
@@ -0,0 +1,116 @@
+import logging
+
+from django.db.models import Exists
+from django.db.models import OuterRef
+from django.db.models import Prefetch
+
+from documents.models import Workflow
+from documents.models import WorkflowAction
+from documents.models import WorkflowTrigger
+
+logger = logging.getLogger("paperless.workflows")
+
+
+def get_workflows_for_trigger(
+    trigger_type: WorkflowTrigger.WorkflowTriggerType,
+    workflow_to_run: Workflow | None = None,
+):
+    """
+    Return workflows relevant to a trigger. If a specific workflow is given,
+    wrap it in a list; otherwise fetch enabled workflows for the trigger with
+    the prefetches used by the runner.
+    """
+    if workflow_to_run is not None:
+        return [workflow_to_run]
+
+    annotated_actions = (
+        WorkflowAction.objects.select_related(
+            "assign_correspondent",
+            "assign_document_type",
+            "assign_storage_path",
+            "assign_owner",
+            "email",
+            "webhook",
+        )
+        .prefetch_related(
+            "assign_tags",
+            "assign_view_users",
+            "assign_view_groups",
+            "assign_change_users",
+            "assign_change_groups",
+            "assign_custom_fields",
+            "remove_tags",
+            "remove_correspondents",
+            "remove_document_types",
+            "remove_storage_paths",
+            "remove_custom_fields",
+            "remove_owners",
+        )
+        .annotate(
+            has_assign_tags=Exists(
+                WorkflowAction.assign_tags.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_assign_view_users=Exists(
+                WorkflowAction.assign_view_users.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_assign_view_groups=Exists(
+                WorkflowAction.assign_view_groups.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_assign_change_users=Exists(
+                WorkflowAction.assign_change_users.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_assign_change_groups=Exists(
+                WorkflowAction.assign_change_groups.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_assign_custom_fields=Exists(
+                WorkflowAction.assign_custom_fields.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_remove_view_users=Exists(
+                WorkflowAction.remove_view_users.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_remove_view_groups=Exists(
+                WorkflowAction.remove_view_groups.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_remove_change_users=Exists(
+                WorkflowAction.remove_change_users.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_remove_change_groups=Exists(
+                WorkflowAction.remove_change_groups.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+            has_remove_custom_fields=Exists(
+                WorkflowAction.remove_custom_fields.through.objects.filter(
+                    workflowaction_id=OuterRef("pk"),
+                ),
+            ),
+        )
+    )
+
+    return (
+        Workflow.objects.filter(enabled=True, triggers__type=trigger_type)
+        .prefetch_related(
+            Prefetch("actions", queryset=annotated_actions),
+            "triggers",
+        )
+        .order_by("order")
+        .distinct()
+    )
diff --git a/src/documents/workflows/webhooks.py b/src/documents/workflows/webhooks.py
new file mode 100644 (file)
index 0000000..c7bb9f7
--- /dev/null
@@ -0,0 +1,96 @@
+import ipaddress
+import logging
+import socket
+from urllib.parse import urlparse
+
+import httpx
+from celery import shared_task
+from django.conf import settings
+
+logger = logging.getLogger("paperless.workflows.webhooks")
+
+
+def _is_public_ip(ip: str) -> bool:
+    try:
+        obj = ipaddress.ip_address(ip)
+        return not (
+            obj.is_private
+            or obj.is_loopback
+            or obj.is_link_local
+            or obj.is_multicast
+            or obj.is_unspecified
+        )
+    except ValueError:  # pragma: no cover
+        return False
+
+
+def _resolve_first_ip(host: str) -> str | None:
+    try:
+        info = socket.getaddrinfo(host, None)
+        return info[0][4][0] if info else None
+    except Exception:  # pragma: no cover
+        return None
+
+
+@shared_task(
+    retry_backoff=True,
+    autoretry_for=(httpx.HTTPStatusError,),
+    max_retries=3,
+    throws=(httpx.HTTPError,),
+)
+def send_webhook(
+    url: str,
+    data: str | dict,
+    headers: dict,
+    files: dict,
+    *,
+    as_json: bool = False,
+):
+    p = urlparse(url)
+    if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
+        logger.warning("Webhook blocked: invalid scheme/hostname")
+        raise ValueError("Invalid URL scheme or hostname.")
+
+    port = p.port or (443 if p.scheme == "https" else 80)
+    if (
+        len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
+        and port not in settings.WEBHOOKS_ALLOWED_PORTS
+    ):
+        logger.warning("Webhook blocked: port not permitted")
+        raise ValueError("Destination port not permitted.")
+
+    ip = _resolve_first_ip(p.hostname)
+    if not ip or (
+        not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
+    ):
+        logger.warning("Webhook blocked: destination not allowed")
+        raise ValueError("Destination host is not allowed.")
+
+    try:
+        post_args = {
+            "url": url,
+            "headers": {
+                k: v for k, v in (headers or {}).items() if k.lower() != "host"
+            },
+            "files": files or None,
+            "timeout": 5.0,
+            "follow_redirects": False,
+        }
+        if as_json:
+            post_args["json"] = data
+        elif isinstance(data, dict):
+            post_args["data"] = data
+        else:
+            post_args["content"] = data
+
+        httpx.post(
+            **post_args,
+        ).raise_for_status()
+        logger.info(
+            f"Webhook sent to {url}",
+        )
+    except Exception as e:
+        logger.error(
+            f"Failed attempt sending webhook to {url}: {e}",
+        )
+        raise e