from __future__ import annotations
-import ipaddress
import logging
import shutil
-import socket
from pathlib import Path
from typing import TYPE_CHECKING
-from urllib.parse import urlparse
-import httpx
from celery import shared_task
from celery import states
from celery.signals import before_task_publish
from django.dispatch import receiver
from django.utils import timezone
from filelock import FileLock
-from guardian.shortcuts import remove_perm
from documents import matching
from documents.caching import clear_document_caches
from documents.file_handling import create_source_path_directory
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_unique_filename
-from documents.mail import EmailAttachment
-from documents.mail import send_email
-from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
-from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import PaperlessTask
from documents.models import SavedView
from documents.models import WorkflowRun
from documents.models import WorkflowTrigger
from documents.permissions import get_objects_for_user_owner_aware
-from documents.permissions import set_permissions_for_object
-from documents.templating.workflows import parse_w_workflow_placeholders
+from documents.workflows.actions import build_workflow_action_context
+from documents.workflows.actions import execute_email_action
+from documents.workflows.actions import execute_webhook_action
+from documents.workflows.mutations import apply_assignment_to_document
+from documents.workflows.mutations import apply_assignment_to_overrides
+from documents.workflows.mutations import apply_removal_to_document
+from documents.workflows.mutations import apply_removal_to_overrides
+from documents.workflows.utils import get_workflows_for_trigger
if TYPE_CHECKING:
from documents.classifier import DocumentClassifier
)
-def _is_public_ip(ip: str) -> bool:
- try:
- obj = ipaddress.ip_address(ip)
- return not (
- obj.is_private
- or obj.is_loopback
- or obj.is_link_local
- or obj.is_multicast
- or obj.is_unspecified
- )
- except ValueError: # pragma: no cover
- return False
-
-
-def _resolve_first_ip(host: str) -> str | None:
- try:
- info = socket.getaddrinfo(host, None)
- return info[0][4][0] if info else None
- except Exception: # pragma: no cover
- return None
-
-
-@shared_task(
- retry_backoff=True,
- autoretry_for=(httpx.HTTPStatusError,),
- max_retries=3,
- throws=(httpx.HTTPError,),
-)
-def send_webhook(
- url: str,
- data: str | dict,
- headers: dict,
- files: dict,
- *,
- as_json: bool = False,
-):
- p = urlparse(url)
- if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
- logger.warning("Webhook blocked: invalid scheme/hostname")
- raise ValueError("Invalid URL scheme or hostname.")
-
- port = p.port or (443 if p.scheme == "https" else 80)
- if (
- len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
- and port not in settings.WEBHOOKS_ALLOWED_PORTS
- ):
- logger.warning("Webhook blocked: port not permitted")
- raise ValueError("Destination port not permitted.")
-
- ip = _resolve_first_ip(p.hostname)
- if not ip or (
- not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
- ):
- logger.warning("Webhook blocked: destination not allowed")
- raise ValueError("Destination host is not allowed.")
-
- try:
- post_args = {
- "url": url,
- "headers": {
- k: v for k, v in (headers or {}).items() if k.lower() != "host"
- },
- "files": files or None,
- "timeout": 5.0,
- "follow_redirects": False,
- }
- if as_json:
- post_args["json"] = data
- elif isinstance(data, dict):
- post_args["data"] = data
- else:
- post_args["content"] = data
-
- httpx.post(
- **post_args,
- ).raise_for_status()
- logger.info(
- f"Webhook sent to {url}",
- )
- except Exception as e:
- logger.error(
- f"Failed attempt sending webhook to {url}: {e}",
- )
- raise e
-
-
def run_workflows(
trigger_type: WorkflowTrigger.WorkflowTriggerType,
document: Document | ConsumableDocument,
overrides: DocumentMetadataOverrides | None = None,
original_file: Path | None = None,
) -> tuple[DocumentMetadataOverrides, str] | None:
- """Run workflows which match a Document (or ConsumableDocument) for a specific trigger type or a single workflow if given.
-
- Assignment or removal actions are either applied directly to the document or an overrides object. If an overrides
- object is provided, the function returns the object with the applied changes or None if no actions were applied and a string
- of messages for each action. If no overrides object is provided, the changes are applied directly to the document and the
- function returns None.
"""
+ Execute workflows matching a document for the given trigger. When `overrides` is provided
+ (consumption flow), actions mutate that object and the function returns `(overrides, messages)`.
+ Otherwise actions mutate the actual document and return nothing.
- def assignment_action():
- if action.assign_tags.exists():
- tag_ids_to_add: set[int] = set()
- for tag in action.assign_tags.all():
- tag_ids_to_add.add(tag.pk)
- tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
-
- if not use_overrides:
- doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add)
- else:
- if overrides.tag_ids is None:
- overrides.tag_ids = []
- overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add)
-
- if action.assign_correspondent:
- if not use_overrides:
- document.correspondent = action.assign_correspondent
- else:
- overrides.correspondent_id = action.assign_correspondent.pk
-
- if action.assign_document_type:
- if not use_overrides:
- document.document_type = action.assign_document_type
- else:
- overrides.document_type_id = action.assign_document_type.pk
-
- if action.assign_storage_path:
- if not use_overrides:
- document.storage_path = action.assign_storage_path
- else:
- overrides.storage_path_id = action.assign_storage_path.pk
-
- if action.assign_owner:
- if not use_overrides:
- document.owner = action.assign_owner
- else:
- overrides.owner_id = action.assign_owner.pk
-
- if action.assign_title:
- if not use_overrides:
- try:
- document.title = parse_w_workflow_placeholders(
- action.assign_title,
- document.correspondent.name if document.correspondent else "",
- document.document_type.name if document.document_type else "",
- document.owner.username if document.owner else "",
- timezone.localtime(document.added),
- document.original_filename or "",
- document.filename or "",
- document.created,
- )
- except Exception:
- logger.exception(
- f"Error occurred parsing title assignment '{action.assign_title}', falling back to original",
- extra={"group": logging_group},
- )
- else:
- overrides.title = action.assign_title
-
- if any(
- [
- action.assign_view_users.exists(),
- action.assign_view_groups.exists(),
- action.assign_change_users.exists(),
- action.assign_change_groups.exists(),
- ],
- ):
- permissions = {
- "view": {
- "users": action.assign_view_users.values_list("id", flat=True),
- "groups": action.assign_view_groups.values_list("id", flat=True),
- },
- "change": {
- "users": action.assign_change_users.values_list("id", flat=True),
- "groups": action.assign_change_groups.values_list("id", flat=True),
- },
- }
- if not use_overrides:
- set_permissions_for_object(
- permissions=permissions,
- object=document,
- merge=True,
- )
- else:
- overrides.view_users = list(
- set(
- (overrides.view_users or [])
- + list(permissions["view"]["users"]),
- ),
- )
- overrides.view_groups = list(
- set(
- (overrides.view_groups or [])
- + list(permissions["view"]["groups"]),
- ),
- )
- overrides.change_users = list(
- set(
- (overrides.change_users or [])
- + list(permissions["change"]["users"]),
- ),
- )
- overrides.change_groups = list(
- set(
- (overrides.change_groups or [])
- + list(permissions["change"]["groups"]),
- ),
- )
+ Attachments for email/webhook actions use `original_file` when given, otherwise fall back to
+ `document.source_path` (Document) or `document.original_file` (ConsumableDocument).
- if action.assign_custom_fields.exists():
- if not use_overrides:
- for field in action.assign_custom_fields.all():
- value_field_name = CustomFieldInstance.get_value_field_name(
- data_type=field.data_type,
- )
- args = {
- value_field_name: action.assign_custom_fields_values.get(
- str(field.pk),
- None,
- ),
- }
- # for some reason update_or_create doesn't work here
- instance = CustomFieldInstance.objects.filter(
- field=field,
- document=document,
- ).first()
- if instance and args[value_field_name] is not None:
- setattr(instance, value_field_name, args[value_field_name])
- instance.save()
- elif not instance:
- CustomFieldInstance.objects.create(
- **args,
- field=field,
- document=document,
- )
- else:
- if overrides.custom_fields is None:
- overrides.custom_fields = {}
- overrides.custom_fields.update(
- {
- field.pk: action.assign_custom_fields_values.get(
- str(field.pk),
- None,
- )
- for field in action.assign_custom_fields.all()
- },
- )
-
- def removal_action():
- if action.remove_all_tags:
- if not use_overrides:
- doc_tag_ids.clear()
- else:
- overrides.tag_ids = None
- else:
- tag_ids_to_remove: set[int] = set()
- for tag in action.remove_tags.all():
- tag_ids_to_remove.add(tag.pk)
- tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
-
- if not use_overrides:
- doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove]
- elif overrides.tag_ids:
- overrides.tag_ids = [
- t for t in overrides.tag_ids if t not in tag_ids_to_remove
- ]
-
- if not use_overrides and (
- action.remove_all_correspondents
- or (
- document.correspondent
- and action.remove_correspondents.filter(
- pk=document.correspondent.pk,
- ).exists()
- )
- ):
- document.correspondent = None
- elif use_overrides and (
- action.remove_all_correspondents
- or (
- overrides.correspondent_id
- and action.remove_correspondents.filter(
- pk=overrides.correspondent_id,
- ).exists()
- )
- ):
- overrides.correspondent_id = None
-
- if not use_overrides and (
- action.remove_all_document_types
- or (
- document.document_type
- and action.remove_document_types.filter(
- pk=document.document_type.pk,
- ).exists()
- )
- ):
- document.document_type = None
- elif use_overrides and (
- action.remove_all_document_types
- or (
- overrides.document_type_id
- and action.remove_document_types.filter(
- pk=overrides.document_type_id,
- ).exists()
- )
- ):
- overrides.document_type_id = None
-
- if not use_overrides and (
- action.remove_all_storage_paths
- or (
- document.storage_path
- and action.remove_storage_paths.filter(
- pk=document.storage_path.pk,
- ).exists()
- )
- ):
- document.storage_path = None
- elif use_overrides and (
- action.remove_all_storage_paths
- or (
- overrides.storage_path_id
- and action.remove_storage_paths.filter(
- pk=overrides.storage_path_id,
- ).exists()
- )
- ):
- overrides.storage_path_id = None
-
- if not use_overrides and (
- action.remove_all_owners
- or (
- document.owner
- and action.remove_owners.filter(pk=document.owner.pk).exists()
- )
- ):
- document.owner = None
- elif use_overrides and (
- action.remove_all_owners
- or (
- overrides.owner_id
- and action.remove_owners.filter(pk=overrides.owner_id).exists()
- )
- ):
- overrides.owner_id = None
-
- if action.remove_all_permissions:
- if not use_overrides:
- permissions = {
- "view": {"users": [], "groups": []},
- "change": {"users": [], "groups": []},
- }
- set_permissions_for_object(
- permissions=permissions,
- object=document,
- merge=False,
- )
- else:
- overrides.view_users = None
- overrides.view_groups = None
- overrides.change_users = None
- overrides.change_groups = None
- elif any(
- [
- action.remove_view_users.exists(),
- action.remove_view_groups.exists(),
- action.remove_change_users.exists(),
- action.remove_change_groups.exists(),
- ],
- ):
- if not use_overrides:
- for user in action.remove_view_users.all():
- remove_perm("view_document", user, document)
- for user in action.remove_change_users.all():
- remove_perm("change_document", user, document)
- for group in action.remove_view_groups.all():
- remove_perm("view_document", group, document)
- for group in action.remove_change_groups.all():
- remove_perm("change_document", group, document)
- else:
- if overrides.view_users:
- for user in action.remove_view_users.filter(
- pk__in=overrides.view_users,
- ):
- overrides.view_users.remove(user.pk)
- if overrides.change_users:
- for user in action.remove_change_users.filter(
- pk__in=overrides.change_users,
- ):
- overrides.change_users.remove(user.pk)
- if overrides.view_groups:
- for group in action.remove_view_groups.filter(
- pk__in=overrides.view_groups,
- ):
- overrides.view_groups.remove(group.pk)
- if overrides.change_groups:
- for group in action.remove_change_groups.filter(
- pk__in=overrides.change_groups,
- ):
- overrides.change_groups.remove(group.pk)
-
- if action.remove_all_custom_fields:
- if not use_overrides:
- CustomFieldInstance.objects.filter(document=document).hard_delete()
- else:
- overrides.custom_fields = None
- elif action.remove_custom_fields.exists():
- if not use_overrides:
- CustomFieldInstance.objects.filter(
- field__in=action.remove_custom_fields.all(),
- document=document,
- ).hard_delete()
- elif overrides.custom_fields:
- for field in action.remove_custom_fields.filter(
- pk__in=overrides.custom_fields.keys(),
- ):
- overrides.custom_fields.pop(field.pk, None)
-
- def email_action():
- if not settings.EMAIL_ENABLED:
- logger.error(
- "Email backend has not been configured, cannot send email notifications",
- extra={"group": logging_group},
- )
- return
-
- if not use_overrides:
- title = document.title
- doc_url = (
- f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
- )
- correspondent = (
- document.correspondent.name if document.correspondent else ""
- )
- document_type = (
- document.document_type.name if document.document_type else ""
- )
- owner_username = document.owner.username if document.owner else ""
- filename = document.original_filename or ""
- current_filename = document.filename or ""
- added = timezone.localtime(document.added)
- created = document.created
- else:
- title = overrides.title if overrides.title else str(document.original_file)
- doc_url = ""
- correspondent = (
- Correspondent.objects.filter(pk=overrides.correspondent_id).first()
- if overrides.correspondent_id
- else ""
- )
- document_type = (
- DocumentType.objects.filter(pk=overrides.document_type_id).first().name
- if overrides.document_type_id
- else ""
- )
- owner_username = (
- User.objects.filter(pk=overrides.owner_id).first().username
- if overrides.owner_id
- else ""
- )
- filename = document.original_file if document.original_file else ""
- current_filename = filename
- added = timezone.localtime(timezone.now())
- created = overrides.created
-
- subject = (
- parse_w_workflow_placeholders(
- action.email.subject,
- correspondent,
- document_type,
- owner_username,
- added,
- filename,
- current_filename,
- created,
- title,
- doc_url,
- )
- if action.email.subject
- else ""
- )
- body = (
- parse_w_workflow_placeholders(
- action.email.body,
- correspondent,
- document_type,
- owner_username,
- added,
- filename,
- current_filename,
- created,
- title,
- doc_url,
- )
- if action.email.body
- else ""
- )
- try:
- attachments: list[EmailAttachment] = []
- if action.email.include_document:
- attachment: EmailAttachment | None = None
- if trigger_type in [
- WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
- WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
- ] and isinstance(document, Document):
- friendly_name = (
- Path(current_filename).name
- if current_filename
- else document.source_path.name
- )
- attachment = EmailAttachment(
- path=document.source_path,
- mime_type=document.mime_type,
- friendly_name=friendly_name,
- )
- elif original_file:
- friendly_name = (
- Path(current_filename).name
- if current_filename
- else original_file.name
- )
- attachment = EmailAttachment(
- path=original_file,
- mime_type=document.mime_type,
- friendly_name=friendly_name,
- )
- if attachment:
- attachments = [attachment]
- n_messages = send_email(
- subject=subject,
- body=body,
- to=action.email.to.split(","),
- attachments=attachments,
- )
- logger.debug(
- f"Sent {n_messages} notification email(s) to {action.email.to}",
- extra={"group": logging_group},
- )
- except Exception as e:
- logger.exception(
- f"Error occurred sending notification email: {e}",
- extra={"group": logging_group},
- )
-
- def webhook_action():
- if not use_overrides:
- title = document.title
- doc_url = (
- f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
- )
- correspondent = (
- document.correspondent.name if document.correspondent else ""
- )
- document_type = (
- document.document_type.name if document.document_type else ""
- )
- owner_username = document.owner.username if document.owner else ""
- filename = document.original_filename or ""
- current_filename = document.filename or ""
- added = timezone.localtime(document.added)
- created = document.created
- else:
- title = overrides.title if overrides.title else str(document.original_file)
- doc_url = ""
- correspondent = (
- Correspondent.objects.filter(pk=overrides.correspondent_id).first()
- if overrides.correspondent_id
- else ""
- )
- document_type = (
- DocumentType.objects.filter(pk=overrides.document_type_id).first().name
- if overrides.document_type_id
- else ""
- )
- owner_username = (
- User.objects.filter(pk=overrides.owner_id).first().username
- if overrides.owner_id
- else ""
- )
- filename = document.original_file if document.original_file else ""
- current_filename = filename
- added = timezone.localtime(timezone.now())
- created = overrides.created
-
- try:
- data = {}
- if action.webhook.use_params:
- if action.webhook.params:
- try:
- for key, value in action.webhook.params.items():
- data[key] = parse_w_workflow_placeholders(
- value,
- correspondent,
- document_type,
- owner_username,
- added,
- filename,
- current_filename,
- created,
- title,
- doc_url,
- )
- except Exception as e:
- logger.error(
- f"Error occurred parsing webhook params: {e}",
- extra={"group": logging_group},
- )
- elif action.webhook.body:
- data = parse_w_workflow_placeholders(
- action.webhook.body,
- correspondent,
- document_type,
- owner_username,
- added,
- filename,
- current_filename,
- created,
- title,
- doc_url,
- )
- headers = {}
- if action.webhook.headers:
- try:
- headers = {
- str(k): str(v) for k, v in action.webhook.headers.items()
- }
- except Exception as e:
- logger.error(
- f"Error occurred parsing webhook headers: {e}",
- extra={"group": logging_group},
- )
- files = None
- if action.webhook.include_document:
- with original_file.open("rb") as f:
- files = {
- "file": (
- filename,
- f.read(),
- document.mime_type,
- ),
- }
- send_webhook.delay(
- url=action.webhook.url,
- data=data,
- headers=headers,
- files=files,
- as_json=action.webhook.as_json,
- )
- logger.debug(
- f"Webhook to {action.webhook.url} queued",
- extra={"group": logging_group},
- )
- except Exception as e:
- logger.exception(
- f"Error occurred sending webhook: {e}",
- extra={"group": logging_group},
- )
+ Passing `workflow_to_run` skips the workflow query (currently only used by scheduled runs).
+ """
use_overrides = overrides is not None
if original_file is None:
)
messages = []
- workflows = (
- (
- Workflow.objects.filter(enabled=True, triggers__type=trigger_type)
- .prefetch_related(
- "actions",
- "actions__assign_view_users",
- "actions__assign_view_groups",
- "actions__assign_change_users",
- "actions__assign_change_groups",
- "actions__assign_custom_fields",
- "actions__remove_tags",
- "actions__remove_correspondents",
- "actions__remove_document_types",
- "actions__remove_storage_paths",
- "actions__remove_custom_fields",
- "actions__remove_owners",
- "triggers",
- )
- .order_by("order")
- .distinct()
- )
- if workflow_to_run is None
- else [workflow_to_run]
- )
+ workflows = get_workflows_for_trigger(trigger_type, workflow_to_run)
for workflow in workflows:
if not use_overrides:
messages.append(message)
if action.type == WorkflowAction.WorkflowActionType.ASSIGNMENT:
- assignment_action()
+ if use_overrides and overrides:
+ apply_assignment_to_overrides(action, overrides)
+ else:
+ apply_assignment_to_document(
+ action,
+ document,
+ doc_tag_ids,
+ logging_group,
+ )
elif action.type == WorkflowAction.WorkflowActionType.REMOVAL:
- removal_action()
+ if use_overrides and overrides:
+ apply_removal_to_overrides(action, overrides)
+ else:
+ apply_removal_to_document(action, document, doc_tag_ids)
elif action.type == WorkflowAction.WorkflowActionType.EMAIL:
- email_action()
+ context = build_workflow_action_context(document, overrides)
+ execute_email_action(
+ action,
+ document,
+ context,
+ logging_group,
+ original_file,
+ trigger_type,
+ )
elif action.type == WorkflowAction.WorkflowActionType.WEBHOOK:
- webhook_action()
+ context = build_workflow_action_context(document, overrides)
+ execute_webhook_action(
+ action,
+ document,
+ context,
+ logging_group,
+ original_file,
+ )
if not use_overrides:
# limit title to 128 characters
from documents.models import PaperlessTask
from documents.models import StoragePath
from documents.models import Tag
-from documents.models import Workflow
from documents.models import WorkflowRun
from documents.models import WorkflowTrigger
from documents.parsers import DocumentParser
from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion
from documents.signals.handlers import run_workflows
+from documents.workflows.utils import get_workflows_for_trigger
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
Once a document satisfies this condition, and recurring/non-recurring constraints are met, the workflow is run.
"""
- scheduled_workflows: list[Workflow] = (
- Workflow.objects.filter(
- triggers__type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
- enabled=True,
- )
- .distinct()
- .prefetch_related("triggers")
+ scheduled_workflows = get_workflows_for_trigger(
+ WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
)
if scheduled_workflows.count() > 0:
logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows")
from documents.file_handling import create_source_path_directory
from documents.file_handling import generate_unique_filename
from documents.signals.handlers import run_workflows
-from documents.signals.handlers import send_webhook
+from documents.workflows.webhooks import send_webhook
if TYPE_CHECKING:
from django.db.models import QuerySet
mock_email_send.return_value = 1
- with self.assertNoLogs("paperless.handlers", level="ERROR"):
+ with self.assertNoLogs("paperless.workflows", level="ERROR"):
run_workflows(
WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
consumable_document,
original_filename="sample.pdf",
)
- with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+ with self.assertLogs("paperless.workflows.actions", level="ERROR") as cm:
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
expected_str = "Email backend has not been configured"
original_filename="sample.pdf",
)
- with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+ with self.assertLogs("paperless.workflows", level="ERROR") as cm:
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
expected_str = "Error occurred sending email"
PAPERLESS_FORCE_SCRIPT_NAME="/paperless",
BASE_URL="/paperless/",
)
- @mock.patch("documents.signals.handlers.send_webhook.delay")
+ @mock.patch("documents.workflows.webhooks.send_webhook.delay")
def test_workflow_webhook_action_body(self, mock_post):
"""
GIVEN:
@override_settings(
PAPERLESS_URL="http://localhost:8000",
)
- @mock.patch("documents.signals.handlers.send_webhook.delay")
+ @mock.patch("documents.workflows.webhooks.send_webhook.delay")
def test_workflow_webhook_action_w_files(self, mock_post):
"""
GIVEN:
)
# fails because no file
- with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+ with self.assertLogs("paperless.workflows", level="ERROR") as cm:
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
expected_str = "Error occurred sending webhook"
original_filename="sample.pdf",
)
- with self.assertLogs("paperless.handlers", level="ERROR") as cm:
+ with self.assertLogs("paperless.workflows", level="ERROR") as cm:
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
expected_str = "Error occurred parsing webhook params"
raise_for_status=mock.Mock(),
)
- with self.assertLogs("paperless.handlers") as cm:
+ with self.assertLogs("paperless.workflows") as cm:
send_webhook(
url="http://paperless-ngx.com",
data="Test message",
),
)
- with self.assertLogs("paperless.handlers") as cm:
+ with self.assertLogs("paperless.workflows") as cm:
with self.assertRaises(HTTPStatusError):
send_webhook(
url="http://paperless-ngx.com",
)
self.assertIn(expected_str, cm.output[0])
- @mock.patch("documents.signals.handlers.send_webhook.delay")
+ @mock.patch("documents.workflows.webhooks.send_webhook.delay")
def test_workflow_webhook_action_consumption(self, mock_post):
"""
GIVEN:
--- /dev/null
+import logging
+from pathlib import Path
+
+from django.conf import settings
+from django.contrib.auth.models import User
+from django.utils import timezone
+
+from documents.data_models import ConsumableDocument
+from documents.data_models import DocumentMetadataOverrides
+from documents.mail import EmailAttachment
+from documents.mail import send_email
+from documents.models import Correspondent
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import WorkflowAction
+from documents.models import WorkflowTrigger
+from documents.templating.workflows import parse_w_workflow_placeholders
+from documents.workflows.webhooks import send_webhook
+
+logger = logging.getLogger("paperless.workflows.actions")
+
+
+def build_workflow_action_context(
+ document: Document | ConsumableDocument,
+ overrides: DocumentMetadataOverrides | None,
+) -> dict:
+ """
+ Build context dictionary for workflow action placeholder parsing.
+ """
+ use_overrides = overrides is not None
+
+ if not use_overrides:
+ return {
+ "title": document.title,
+ "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/",
+ "correspondent": document.correspondent.name
+ if document.correspondent
+ else "",
+ "document_type": document.document_type.name
+ if document.document_type
+ else "",
+ "owner_username": document.owner.username if document.owner else "",
+ "filename": document.original_filename or "",
+ "current_filename": document.filename or "",
+ "added": timezone.localtime(document.added),
+ "created": document.created,
+ }
+
+ correspondent_obj = (
+ Correspondent.objects.filter(pk=overrides.correspondent_id).first()
+ if overrides and overrides.correspondent_id
+ else None
+ )
+ document_type_obj = (
+ DocumentType.objects.filter(pk=overrides.document_type_id).first()
+ if overrides and overrides.document_type_id
+ else None
+ )
+ owner_obj = (
+ User.objects.filter(pk=overrides.owner_id).first()
+ if overrides and overrides.owner_id
+ else None
+ )
+
+ filename = document.original_file if document.original_file else ""
+ return {
+ "title": overrides.title
+ if overrides and overrides.title
+ else str(document.original_file),
+ "doc_url": "",
+ "correspondent": correspondent_obj.name if correspondent_obj else "",
+ "document_type": document_type_obj.name if document_type_obj else "",
+ "owner_username": owner_obj.username if owner_obj else "",
+ "filename": filename,
+ "current_filename": filename,
+ "added": timezone.localtime(timezone.now()),
+ "created": overrides.created if overrides else None,
+ }
+
+
+def execute_email_action(
+ action: WorkflowAction,
+ document: Document | ConsumableDocument,
+ context: dict,
+ logging_group,
+ original_file: Path,
+ trigger_type: WorkflowTrigger.WorkflowTriggerType,
+) -> None:
+ """
+ Execute an email action for a workflow.
+ """
+
+ if not settings.EMAIL_ENABLED:
+ logger.error(
+ "Email backend has not been configured, cannot send email notifications",
+ extra={"group": logging_group},
+ )
+ return
+
+ subject = (
+ parse_w_workflow_placeholders(
+ action.email.subject,
+ context["correspondent"],
+ context["document_type"],
+ context["owner_username"],
+ context["added"],
+ context["filename"],
+ context["current_filename"],
+ context["created"],
+ context["title"],
+ context["doc_url"],
+ )
+ if action.email.subject
+ else ""
+ )
+ body = (
+ parse_w_workflow_placeholders(
+ action.email.body,
+ context["correspondent"],
+ context["document_type"],
+ context["owner_username"],
+ context["added"],
+ context["filename"],
+ context["current_filename"],
+ context["created"],
+ context["title"],
+ context["doc_url"],
+ )
+ if action.email.body
+ else ""
+ )
+
+ try:
+ attachments: list[EmailAttachment] = []
+ if action.email.include_document:
+ attachment: EmailAttachment | None = None
+ if trigger_type in [
+ WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
+ WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
+ ] and isinstance(document, Document):
+ friendly_name = (
+ Path(context["current_filename"]).name
+ if context["current_filename"]
+ else document.source_path.name
+ )
+ attachment = EmailAttachment(
+ path=document.source_path,
+ mime_type=document.mime_type,
+ friendly_name=friendly_name,
+ )
+ elif original_file:
+ friendly_name = (
+ Path(context["current_filename"]).name
+ if context["current_filename"]
+ else original_file.name
+ )
+ attachment = EmailAttachment(
+ path=original_file,
+ mime_type=document.mime_type,
+ friendly_name=friendly_name,
+ )
+ if attachment:
+ attachments = [attachment]
+
+ n_messages = send_email(
+ subject=subject,
+ body=body,
+ to=action.email.to.split(","),
+ attachments=attachments,
+ )
+ logger.debug(
+ f"Sent {n_messages} notification email(s) to {action.email.to}",
+ extra={"group": logging_group},
+ )
+ except Exception as e:
+ logger.exception(
+ f"Error occurred sending notification email: {e}",
+ extra={"group": logging_group},
+ )
+
+
+def execute_webhook_action(
+ action: WorkflowAction,
+ document: Document | ConsumableDocument,
+ context: dict,
+ logging_group,
+ original_file: Path,
+):
+ try:
+ data = {}
+ if action.webhook.use_params:
+ if action.webhook.params:
+ try:
+ for key, value in action.webhook.params.items():
+ data[key] = parse_w_workflow_placeholders(
+ value,
+ context["correspondent"],
+ context["document_type"],
+ context["owner_username"],
+ context["added"],
+ context["filename"],
+ context["current_filename"],
+ context["created"],
+ context["title"],
+ context["doc_url"],
+ )
+ except Exception as e:
+ logger.error(
+ f"Error occurred parsing webhook params: {e}",
+ extra={"group": logging_group},
+ )
+ elif action.webhook.body:
+ data = parse_w_workflow_placeholders(
+ action.webhook.body,
+ context["correspondent"],
+ context["document_type"],
+ context["owner_username"],
+ context["added"],
+ context["filename"],
+ context["current_filename"],
+ context["created"],
+ context["title"],
+ context["doc_url"],
+ )
+ headers = {}
+ if action.webhook.headers:
+ try:
+ headers = {str(k): str(v) for k, v in action.webhook.headers.items()}
+ except Exception as e:
+ logger.error(
+ f"Error occurred parsing webhook headers: {e}",
+ extra={"group": logging_group},
+ )
+ files = None
+ if action.webhook.include_document:
+ with original_file.open("rb") as f:
+ files = {
+ "file": (
+ str(context["filename"])
+ if context["filename"]
+ else original_file.name,
+ f.read(),
+ document.mime_type,
+ ),
+ }
+ send_webhook.delay(
+ url=action.webhook.url,
+ data=data,
+ headers=headers,
+ files=files,
+ as_json=action.webhook.as_json,
+ )
+ logger.debug(
+ f"Webhook to {action.webhook.url} queued",
+ extra={"group": logging_group},
+ )
+ except Exception as e:
+ logger.exception(
+ f"Error occurred sending webhook: {e}",
+ extra={"group": logging_group},
+ )
--- /dev/null
+import logging
+
+from django.utils import timezone
+from guardian.shortcuts import remove_perm
+
+from documents.data_models import DocumentMetadataOverrides
+from documents.models import CustomFieldInstance
+from documents.models import Document
+from documents.models import WorkflowAction
+from documents.permissions import set_permissions_for_object
+from documents.templating.workflows import parse_w_workflow_placeholders
+
+logger = logging.getLogger("paperless.workflows.mutations")
+
+
+def apply_assignment_to_document(
+ action: WorkflowAction,
+ document: Document,
+ doc_tag_ids: list[int],
+ logging_group,
+):
+ """
+ Apply assignment actions to a Document instance.
+
+ action: WorkflowAction, annotated with 'has_assign_*' boolean fields
+ """
+ if action.has_assign_tags:
+ tag_ids_to_add: set[int] = set()
+ for tag in action.assign_tags.all():
+ tag_ids_to_add.add(tag.pk)
+ tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
+
+ doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add)
+
+ if action.assign_correspondent:
+ document.correspondent = action.assign_correspondent
+
+ if action.assign_document_type:
+ document.document_type = action.assign_document_type
+
+ if action.assign_storage_path:
+ document.storage_path = action.assign_storage_path
+
+ if action.assign_owner:
+ document.owner = action.assign_owner
+
+ if action.assign_title:
+ try:
+ document.title = parse_w_workflow_placeholders(
+ action.assign_title,
+ document.correspondent.name if document.correspondent else "",
+ document.document_type.name if document.document_type else "",
+ document.owner.username if document.owner else "",
+ timezone.localtime(document.added),
+ document.original_filename or "",
+ document.filename or "",
+ document.created,
+ )
+ except Exception: # pragma: no cover
+ logger.exception(
+ f"Error occurred parsing title assignment '{action.assign_title}', falling back to original",
+ extra={"group": logging_group},
+ )
+
+ if any(
+ [
+ action.has_assign_view_users,
+ action.has_assign_view_groups,
+ action.has_assign_change_users,
+ action.has_assign_change_groups,
+ ],
+ ):
+ permissions = {
+ "view": {
+ "users": action.assign_view_users.values_list("id", flat=True),
+ "groups": action.assign_view_groups.values_list("id", flat=True),
+ },
+ "change": {
+ "users": action.assign_change_users.values_list("id", flat=True),
+ "groups": action.assign_change_groups.values_list("id", flat=True),
+ },
+ }
+ set_permissions_for_object(
+ permissions=permissions,
+ object=document,
+ merge=True,
+ )
+
+ if action.has_assign_custom_fields:
+ for field in action.assign_custom_fields.all():
+ value_field_name = CustomFieldInstance.get_value_field_name(
+ data_type=field.data_type,
+ )
+ args = {
+ value_field_name: action.assign_custom_fields_values.get(
+ str(field.pk),
+ None,
+ ),
+ }
+ # for some reason update_or_create doesn't work here
+ instance = CustomFieldInstance.objects.filter(
+ field=field,
+ document=document,
+ ).first()
+ if instance and args[value_field_name] is not None:
+ setattr(instance, value_field_name, args[value_field_name])
+ instance.save()
+ elif not instance:
+ CustomFieldInstance.objects.create(
+ **args,
+ field=field,
+ document=document,
+ )
+
+
+def apply_assignment_to_overrides(
+ action: WorkflowAction,
+ overrides: DocumentMetadataOverrides,
+):
+ """
+ Apply assignment actions to DocumentMetadataOverrides.
+
+ action: WorkflowAction, annotated with 'has_assign_*' boolean fields
+ """
+ if action.has_assign_tags:
+ if overrides.tag_ids is None:
+ overrides.tag_ids = []
+ tag_ids_to_add: set[int] = set()
+ for tag in action.assign_tags.all():
+ tag_ids_to_add.add(tag.pk)
+ tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
+
+ overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add)
+
+ if action.assign_correspondent:
+ overrides.correspondent_id = action.assign_correspondent.pk
+
+ if action.assign_document_type:
+ overrides.document_type_id = action.assign_document_type.pk
+
+ if action.assign_storage_path:
+ overrides.storage_path_id = action.assign_storage_path.pk
+
+ if action.assign_owner:
+ overrides.owner_id = action.assign_owner.pk
+
+ if action.assign_title:
+ overrides.title = action.assign_title
+
+ if any(
+ [
+ action.has_assign_view_users,
+ action.has_assign_view_groups,
+ action.has_assign_change_users,
+ action.has_assign_change_groups,
+ ],
+ ):
+ overrides.view_users = list(
+ set(
+ (overrides.view_users or [])
+ + list(action.assign_view_users.values_list("id", flat=True)),
+ ),
+ )
+ overrides.view_groups = list(
+ set(
+ (overrides.view_groups or [])
+ + list(action.assign_view_groups.values_list("id", flat=True)),
+ ),
+ )
+ overrides.change_users = list(
+ set(
+ (overrides.change_users or [])
+ + list(action.assign_change_users.values_list("id", flat=True)),
+ ),
+ )
+ overrides.change_groups = list(
+ set(
+ (overrides.change_groups or [])
+ + list(action.assign_change_groups.values_list("id", flat=True)),
+ ),
+ )
+
+ if action.has_assign_custom_fields:
+ if overrides.custom_fields is None:
+ overrides.custom_fields = {}
+ overrides.custom_fields.update(
+ {
+ field.pk: action.assign_custom_fields_values.get(
+ str(field.pk),
+ None,
+ )
+ for field in action.assign_custom_fields.all()
+ },
+ )
+
+
+def apply_removal_to_document(
+ action: WorkflowAction,
+ document: Document,
+ doc_tag_ids: list[int],
+):
+ """
+ Apply removal actions to a Document instance.
+
+ action: WorkflowAction, annotated with 'has_remove_*' boolean fields
+ """
+
+ if action.remove_all_tags:
+ doc_tag_ids.clear()
+ else:
+ tag_ids_to_remove: set[int] = set()
+ for tag in action.remove_tags.all():
+ tag_ids_to_remove.add(tag.pk)
+ tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
+
+ doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove]
+
+ if action.remove_all_correspondents or (
+ document.correspondent
+ and action.remove_correspondents.filter(pk=document.correspondent.pk).exists()
+ ):
+ document.correspondent = None
+
+ if action.remove_all_document_types or (
+ document.document_type
+ and action.remove_document_types.filter(pk=document.document_type.pk).exists()
+ ):
+ document.document_type = None
+
+ if action.remove_all_storage_paths or (
+ document.storage_path
+ and action.remove_storage_paths.filter(pk=document.storage_path.pk).exists()
+ ):
+ document.storage_path = None
+
+ if action.remove_all_owners or (
+ document.owner and action.remove_owners.filter(pk=document.owner.pk).exists()
+ ):
+ document.owner = None
+
+ if action.remove_all_permissions:
+ permissions = {
+ "view": {"users": [], "groups": []},
+ "change": {"users": [], "groups": []},
+ }
+ set_permissions_for_object(
+ permissions=permissions,
+ object=document,
+ merge=False,
+ )
+
+ if any(
+ [
+ action.has_remove_view_users,
+ action.has_remove_view_groups,
+ action.has_remove_change_users,
+ action.has_remove_change_groups,
+ ],
+ ):
+ for user in action.remove_view_users.all():
+ remove_perm("view_document", user, document)
+ for user in action.remove_change_users.all():
+ remove_perm("change_document", user, document)
+ for group in action.remove_view_groups.all():
+ remove_perm("view_document", group, document)
+ for group in action.remove_change_groups.all():
+ remove_perm("change_document", group, document)
+
+ if action.remove_all_custom_fields:
+ CustomFieldInstance.objects.filter(document=document).hard_delete()
+ elif action.has_remove_custom_fields:
+ CustomFieldInstance.objects.filter(
+ field__in=action.remove_custom_fields.all(),
+ document=document,
+ ).hard_delete()
+
+
+def apply_removal_to_overrides(
+ action: WorkflowAction,
+ overrides: DocumentMetadataOverrides,
+):
+ """
+ Apply removal actions to DocumentMetadataOverrides.
+
+ action: WorkflowAction, annotated with 'has_remove_*' boolean fields
+ """
+ if action.remove_all_tags:
+ overrides.tag_ids = None
+ elif overrides.tag_ids:
+ tag_ids_to_remove: set[int] = set()
+ for tag in action.remove_tags.all():
+ tag_ids_to_remove.add(tag.pk)
+ tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
+
+ overrides.tag_ids = [t for t in overrides.tag_ids if t not in tag_ids_to_remove]
+
+ if action.remove_all_correspondents or (
+ overrides.correspondent_id
+ and action.remove_correspondents.filter(pk=overrides.correspondent_id).exists()
+ ):
+ overrides.correspondent_id = None
+
+ if action.remove_all_document_types or (
+ overrides.document_type_id
+ and action.remove_document_types.filter(pk=overrides.document_type_id).exists()
+ ):
+ overrides.document_type_id = None
+
+ if action.remove_all_storage_paths or (
+ overrides.storage_path_id
+ and action.remove_storage_paths.filter(pk=overrides.storage_path_id).exists()
+ ):
+ overrides.storage_path_id = None
+
+ if action.remove_all_owners or (
+ overrides.owner_id
+ and action.remove_owners.filter(pk=overrides.owner_id).exists()
+ ):
+ overrides.owner_id = None
+
+ if action.remove_all_permissions:
+ overrides.view_users = None
+ overrides.view_groups = None
+ overrides.change_users = None
+ overrides.change_groups = None
+ elif any(
+ [
+ action.has_remove_view_users,
+ action.has_remove_view_groups,
+ action.has_remove_change_users,
+ action.has_remove_change_groups,
+ ],
+ ):
+ if overrides.view_users:
+ for user in action.remove_view_users.filter(pk__in=overrides.view_users):
+ overrides.view_users.remove(user.pk)
+ if overrides.change_users:
+ for user in action.remove_change_users.filter(
+ pk__in=overrides.change_users,
+ ):
+ overrides.change_users.remove(user.pk)
+ if overrides.view_groups:
+ for group in action.remove_view_groups.filter(pk__in=overrides.view_groups):
+ overrides.view_groups.remove(group.pk)
+ if overrides.change_groups:
+ for group in action.remove_change_groups.filter(
+ pk__in=overrides.change_groups,
+ ):
+ overrides.change_groups.remove(group.pk)
+
+ if action.remove_all_custom_fields:
+ overrides.custom_fields = None
+ elif action.has_remove_custom_fields and overrides.custom_fields:
+ for field in action.remove_custom_fields.filter(
+ pk__in=overrides.custom_fields.keys(),
+ ):
+ overrides.custom_fields.pop(field.pk, None)
--- /dev/null
+import logging
+
+from django.db.models import Exists
+from django.db.models import OuterRef
+from django.db.models import Prefetch
+
+from documents.models import Workflow
+from documents.models import WorkflowAction
+from documents.models import WorkflowTrigger
+
+logger = logging.getLogger("paperless.workflows")
+
+
+def get_workflows_for_trigger(
+ trigger_type: WorkflowTrigger.WorkflowTriggerType,
+ workflow_to_run: Workflow | None = None,
+):
+ """
+ Return workflows relevant to a trigger. If a specific workflow is given,
+ wrap it in a list; otherwise fetch enabled workflows for the trigger with
+ the prefetches used by the runner.
+ """
+ if workflow_to_run is not None:
+ return [workflow_to_run]
+
+ annotated_actions = (
+ WorkflowAction.objects.select_related(
+ "assign_correspondent",
+ "assign_document_type",
+ "assign_storage_path",
+ "assign_owner",
+ "email",
+ "webhook",
+ )
+ .prefetch_related(
+ "assign_tags",
+ "assign_view_users",
+ "assign_view_groups",
+ "assign_change_users",
+ "assign_change_groups",
+ "assign_custom_fields",
+ "remove_tags",
+ "remove_correspondents",
+ "remove_document_types",
+ "remove_storage_paths",
+ "remove_custom_fields",
+ "remove_owners",
+ )
+ .annotate(
+ has_assign_tags=Exists(
+ WorkflowAction.assign_tags.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_assign_view_users=Exists(
+ WorkflowAction.assign_view_users.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_assign_view_groups=Exists(
+ WorkflowAction.assign_view_groups.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_assign_change_users=Exists(
+ WorkflowAction.assign_change_users.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_assign_change_groups=Exists(
+ WorkflowAction.assign_change_groups.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_assign_custom_fields=Exists(
+ WorkflowAction.assign_custom_fields.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_remove_view_users=Exists(
+ WorkflowAction.remove_view_users.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_remove_view_groups=Exists(
+ WorkflowAction.remove_view_groups.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_remove_change_users=Exists(
+ WorkflowAction.remove_change_users.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_remove_change_groups=Exists(
+ WorkflowAction.remove_change_groups.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ has_remove_custom_fields=Exists(
+ WorkflowAction.remove_custom_fields.through.objects.filter(
+ workflowaction_id=OuterRef("pk"),
+ ),
+ ),
+ )
+ )
+
+ return (
+ Workflow.objects.filter(enabled=True, triggers__type=trigger_type)
+ .prefetch_related(
+ Prefetch("actions", queryset=annotated_actions),
+ "triggers",
+ )
+ .order_by("order")
+ .distinct()
+ )
--- /dev/null
+import ipaddress
+import logging
+import socket
+from urllib.parse import urlparse
+
+import httpx
+from celery import shared_task
+from django.conf import settings
+
+logger = logging.getLogger("paperless.workflows.webhooks")
+
+
+def _is_public_ip(ip: str) -> bool:
+ try:
+ obj = ipaddress.ip_address(ip)
+ return not (
+ obj.is_private
+ or obj.is_loopback
+ or obj.is_link_local
+ or obj.is_multicast
+ or obj.is_unspecified
+ )
+ except ValueError: # pragma: no cover
+ return False
+
+
+def _resolve_first_ip(host: str) -> str | None:
+ try:
+ info = socket.getaddrinfo(host, None)
+ return info[0][4][0] if info else None
+ except Exception: # pragma: no cover
+ return None
+
+
+@shared_task(
+ retry_backoff=True,
+ autoretry_for=(httpx.HTTPStatusError,),
+ max_retries=3,
+ throws=(httpx.HTTPError,),
+)
+def send_webhook(
+ url: str,
+ data: str | dict,
+ headers: dict,
+ files: dict,
+ *,
+ as_json: bool = False,
+):
+ p = urlparse(url)
+ if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
+ logger.warning("Webhook blocked: invalid scheme/hostname")
+ raise ValueError("Invalid URL scheme or hostname.")
+
+ port = p.port or (443 if p.scheme == "https" else 80)
+ if (
+ len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
+ and port not in settings.WEBHOOKS_ALLOWED_PORTS
+ ):
+ logger.warning("Webhook blocked: port not permitted")
+ raise ValueError("Destination port not permitted.")
+
+ ip = _resolve_first_ip(p.hostname)
+ if not ip or (
+ not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
+ ):
+ logger.warning("Webhook blocked: destination not allowed")
+ raise ValueError("Destination host is not allowed.")
+
+ try:
+ post_args = {
+ "url": url,
+ "headers": {
+ k: v for k, v in (headers or {}).items() if k.lower() != "host"
+ },
+ "files": files or None,
+ "timeout": 5.0,
+ "follow_redirects": False,
+ }
+ if as_json:
+ post_args["json"] = data
+ elif isinstance(data, dict):
+ post_args["data"] = data
+ else:
+ post_args["content"] = data
+
+ httpx.post(
+ **post_args,
+ ).raise_for_status()
+ logger.info(
+ f"Webhook sent to {url}",
+ )
+ except Exception as e:
+ logger.error(
+ f"Failed attempt sending webhook to {url}: {e}",
+ )
+ raise e