]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Merge serializers
authorshamoon <4887959+shamoon@users.noreply.github.com>
Tue, 8 Apr 2025 23:29:32 +0000 (16:29 -0700)
committershamoon <4887959+shamoon@users.noreply.github.com>
Tue, 8 Apr 2025 23:29:32 +0000 (16:29 -0700)
src/documents/serialisers.py [deleted file]
src/documents/tests/test_api_filter_by_custom_fields.py
src/paperless/serialisers.py
src/paperless/views.py
src/paperless_mail/serialisers.py

diff --git a/src/documents/serialisers.py b/src/documents/serialisers.py
deleted file mode 100644 (file)
index c9e0368..0000000
+++ /dev/null
@@ -1,2322 +0,0 @@
-from __future__ import annotations
-
-import datetime
-import logging
-import math
-import re
-import zoneinfo
-from decimal import Decimal
-from typing import TYPE_CHECKING
-
-import magic
-from celery import states
-from django.conf import settings
-from django.contrib.auth.models import Group
-from django.contrib.auth.models import User
-from django.contrib.contenttypes.models import ContentType
-from django.core.validators import DecimalValidator
-from django.core.validators import MaxLengthValidator
-from django.core.validators import RegexValidator
-from django.core.validators import integer_validator
-from django.utils.crypto import get_random_string
-from django.utils.text import slugify
-from django.utils.translation import gettext as _
-from drf_spectacular.utils import extend_schema_field
-from drf_writable_nested.serializers import NestedUpdateMixin
-from guardian.core import ObjectPermissionChecker
-from guardian.shortcuts import get_users_with_perms
-from guardian.utils import get_group_obj_perms_model
-from guardian.utils import get_user_obj_perms_model
-from rest_framework import fields
-from rest_framework import serializers
-from rest_framework.fields import SerializerMethodField
-
-if settings.AUDIT_LOG_ENABLED:
-    from auditlog.context import set_actor
-
-
-from documents import bulk_edit
-from documents.parsers import is_mime_type_supported
-from documents.permissions import get_groups_with_only_permission
-from documents.permissions import set_permissions_for_object
-from documents.templating.filepath import validate_filepath_template_and_render
-from documents.templating.utils import convert_format_str_to_template_format
-from paperless.data_models import DocumentSource
-from paperless.models import Correspondent
-from paperless.models import CustomField
-from paperless.models import CustomFieldInstance
-from paperless.models import Document
-from paperless.models import DocumentType
-from paperless.models import MatchingModel
-from paperless.models import Note
-from paperless.models import PaperlessTask
-from paperless.models import SavedView
-from paperless.models import SavedViewFilterRule
-from paperless.models import ShareLink
-from paperless.models import StoragePath
-from paperless.models import Tag
-from paperless.models import UiSettings
-from paperless.models import Workflow
-from paperless.models import WorkflowAction
-from paperless.models import WorkflowActionEmail
-from paperless.models import WorkflowActionWebhook
-from paperless.models import WorkflowTrigger
-from paperless.validators import uri_validator
-from paperless.validators import url_validator
-
-if TYPE_CHECKING:
-    from collections.abc import Iterable
-
-logger = logging.getLogger("paperless.serializers")
-
-
-# https://www.django-rest-framework.org/api-guide/serializers/#example
-class DynamicFieldsModelSerializer(serializers.ModelSerializer):
-    """
-    A ModelSerializer that takes an additional `fields` argument that
-    controls which fields should be displayed.
-    """
-
-    def __init__(self, *args, **kwargs):
-        # Don't pass the 'fields' arg up to the superclass
-        fields = kwargs.pop("fields", None)
-
-        # Instantiate the superclass normally
-        super().__init__(*args, **kwargs)
-
-        if fields is not None:
-            # Drop any fields that are not specified in the `fields` argument.
-            allowed = set(fields)
-            existing = set(self.fields)
-            for field_name in existing - allowed:
-                self.fields.pop(field_name)
-
-
-class MatchingModelSerializer(serializers.ModelSerializer):
-    document_count = serializers.IntegerField(read_only=True)
-
-    def get_slug(self, obj) -> str:
-        return slugify(obj.name)
-
-    slug = SerializerMethodField()
-
-    def validate(self, data):
-        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
-        name = data.get(
-            "name",
-            self.instance.name if hasattr(self.instance, "name") else None,
-        )
-        owner = (
-            data["owner"]
-            if "owner" in data
-            else self.user
-            if hasattr(self, "user")
-            else None
-        )
-        pk = self.instance.pk if hasattr(self.instance, "pk") else None
-        if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
-            name=name,
-            owner=owner,
-        ).exclude(pk=pk).exists():
-            raise serializers.ValidationError(
-                {"error": "Object violates owner / name unique constraint"},
-            )
-        return data
-
-    def validate_match(self, match):
-        if (
-            "matching_algorithm" in self.initial_data
-            and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
-        ):
-            try:
-                re.compile(match)
-            except re.error as e:
-                raise serializers.ValidationError(
-                    _("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
-                )
-        return match
-
-
-class SetPermissionsMixin:
-    def _validate_user_ids(self, user_ids):
-        users = User.objects.none()
-        if user_ids is not None:
-            users = User.objects.filter(id__in=user_ids)
-            if not users.count() == len(user_ids):
-                raise serializers.ValidationError(
-                    "Some users in don't exist or were specified twice.",
-                )
-        return users
-
-    def _validate_group_ids(self, group_ids):
-        groups = Group.objects.none()
-        if group_ids is not None:
-            groups = Group.objects.filter(id__in=group_ids)
-            if not groups.count() == len(group_ids):
-                raise serializers.ValidationError(
-                    "Some groups in don't exist or were specified twice.",
-                )
-        return groups
-
-    def validate_set_permissions(self, set_permissions=None):
-        permissions_dict = {
-            "view": {},
-            "change": {},
-        }
-        if set_permissions is not None:
-            for action in ["view", "change"]:
-                if action in set_permissions:
-                    if "users" in set_permissions[action]:
-                        users = set_permissions[action]["users"]
-                        permissions_dict[action]["users"] = self._validate_user_ids(
-                            users,
-                        )
-                    if "groups" in set_permissions[action]:
-                        groups = set_permissions[action]["groups"]
-                        permissions_dict[action]["groups"] = self._validate_group_ids(
-                            groups,
-                        )
-                else:
-                    del permissions_dict[action]
-        return permissions_dict
-
-    def _set_permissions(self, permissions, object):
-        set_permissions_for_object(permissions, object)
-
-
-class SerializerWithPerms(serializers.Serializer):
-    def __init__(self, *args, **kwargs):
-        self.user = kwargs.pop("user", None)
-        self.full_perms = kwargs.pop("full_perms", False)
-        self.all_fields = kwargs.pop("all_fields", False)
-        super().__init__(*args, **kwargs)
-
-
-@extend_schema_field(
-    field={
-        "type": "object",
-        "properties": {
-            "view": {
-                "type": "object",
-                "properties": {
-                    "users": {
-                        "type": "array",
-                        "items": {"type": "integer"},
-                    },
-                    "groups": {
-                        "type": "array",
-                        "items": {"type": "integer"},
-                    },
-                },
-            },
-            "change": {
-                "type": "object",
-                "properties": {
-                    "users": {
-                        "type": "array",
-                        "items": {"type": "integer"},
-                    },
-                    "groups": {
-                        "type": "array",
-                        "items": {"type": "integer"},
-                    },
-                },
-            },
-        },
-    },
-)
-class SetPermissionsSerializer(serializers.DictField):
-    pass
-
-
-class OwnedObjectSerializer(
-    SerializerWithPerms,
-    serializers.ModelSerializer,
-    SetPermissionsMixin,
-):
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-
-        if not self.all_fields:
-            try:
-                if self.full_perms:
-                    self.fields.pop("user_can_change")
-                    self.fields.pop("is_shared_by_requester")
-                else:
-                    self.fields.pop("permissions")
-            except KeyError:
-                pass
-
-    @extend_schema_field(
-        field={
-            "type": "object",
-            "properties": {
-                "view": {
-                    "type": "object",
-                    "properties": {
-                        "users": {
-                            "type": "array",
-                            "items": {"type": "integer"},
-                        },
-                        "groups": {
-                            "type": "array",
-                            "items": {"type": "integer"},
-                        },
-                    },
-                },
-                "change": {
-                    "type": "object",
-                    "properties": {
-                        "users": {
-                            "type": "array",
-                            "items": {"type": "integer"},
-                        },
-                        "groups": {
-                            "type": "array",
-                            "items": {"type": "integer"},
-                        },
-                    },
-                },
-            },
-        },
-    )
-    def get_permissions(self, obj) -> dict:
-        view_codename = f"view_{obj.__class__.__name__.lower()}"
-        change_codename = f"change_{obj.__class__.__name__.lower()}"
-
-        return {
-            "view": {
-                "users": get_users_with_perms(
-                    obj,
-                    only_with_perms_in=[view_codename],
-                    with_group_users=False,
-                ).values_list("id", flat=True),
-                "groups": get_groups_with_only_permission(
-                    obj,
-                    codename=view_codename,
-                ).values_list("id", flat=True),
-            },
-            "change": {
-                "users": get_users_with_perms(
-                    obj,
-                    only_with_perms_in=[change_codename],
-                    with_group_users=False,
-                ).values_list("id", flat=True),
-                "groups": get_groups_with_only_permission(
-                    obj,
-                    codename=change_codename,
-                ).values_list("id", flat=True),
-            },
-        }
-
-    def get_user_can_change(self, obj) -> bool:
-        checker = ObjectPermissionChecker(self.user) if self.user is not None else None
-        return (
-            obj.owner is None
-            or obj.owner == self.user
-            or (
-                self.user is not None
-                and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
-            )
-        )
-
-    @staticmethod
-    def get_shared_object_pks(objects: Iterable):
-        """
-        Return the primary keys of the subset of objects that are shared.
-        """
-        try:
-            first_obj = next(iter(objects))
-        except StopIteration:
-            return set()
-
-        ctype = ContentType.objects.get_for_model(first_obj)
-        object_pks = list(obj.pk for obj in objects)
-        pk_type = type(first_obj.pk)
-
-        def get_pks_for_permission_type(model):
-            return map(
-                pk_type,  # coerce the pk to be the same type of the provided objects
-                model.objects.filter(
-                    content_type=ctype,
-                    object_pk__in=object_pks,
-                )
-                .values_list("object_pk", flat=True)
-                .distinct(),
-            )
-
-        UserObjectPermission = get_user_obj_perms_model()
-        GroupObjectPermission = get_group_obj_perms_model()
-        user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
-        group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
-
-        return set(user_permission_pks) | set(group_permission_pks)
-
-    def get_is_shared_by_requester(self, obj: Document) -> bool:
-        # First check the context to see if `shared_object_pks` is set by the parent.
-        shared_object_pks = self.context.get("shared_object_pks")
-        # If not just check if the current object is shared.
-        if shared_object_pks is None:
-            shared_object_pks = self.get_shared_object_pks([obj])
-        return obj.owner == self.user and obj.id in shared_object_pks
-
-    permissions = SerializerMethodField(read_only=True)
-    user_can_change = SerializerMethodField(read_only=True)
-    is_shared_by_requester = SerializerMethodField(read_only=True)
-
-    set_permissions = SetPermissionsSerializer(
-        label="Set permissions",
-        allow_empty=True,
-        required=False,
-        write_only=True,
-    )
-    # other methods in mixin
-
-    def validate_unique_together(self, validated_data, instance=None):
-        # workaround for https://github.com/encode/django-rest-framework/issues/9358
-        if "owner" in validated_data and "name" in self.Meta.fields:
-            name = validated_data.get("name", instance.name if instance else None)
-            objects = (
-                self.Meta.model.objects.exclude(pk=instance.pk)
-                if instance
-                else self.Meta.model.objects.all()
-            )
-            not_unique = objects.filter(
-                owner=validated_data["owner"],
-                name=name,
-            ).exists()
-            if not_unique:
-                raise serializers.ValidationError(
-                    {"error": "Object violates owner / name unique constraint"},
-                )
-
-    def create(self, validated_data):
-        # default to current user if not set
-        request = self.context.get("request")
-        if (
-            "owner" not in validated_data
-            or (request is not None and "owner" not in request.data)
-        ) and self.user:
-            validated_data["owner"] = self.user
-        permissions = None
-        if "set_permissions" in validated_data:
-            permissions = validated_data.pop("set_permissions")
-        self.validate_unique_together(validated_data)
-        instance = super().create(validated_data)
-        if permissions is not None:
-            self._set_permissions(permissions, instance)
-        return instance
-
-    def update(self, instance, validated_data):
-        if "set_permissions" in validated_data:
-            self._set_permissions(validated_data["set_permissions"], instance)
-        self.validate_unique_together(validated_data, instance)
-        return super().update(instance, validated_data)
-
-
-class OwnedObjectListSerializer(serializers.ListSerializer):
-    def to_representation(self, documents):
-        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
-            documents,
-        )
-        return super().to_representation(documents)
-
-
-class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
-    last_correspondence = serializers.DateTimeField(read_only=True, required=False)
-
-    class Meta:
-        model = Correspondent
-        fields = (
-            "id",
-            "slug",
-            "name",
-            "match",
-            "matching_algorithm",
-            "is_insensitive",
-            "document_count",
-            "last_correspondence",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        )
-
-
-class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
-    class Meta:
-        model = DocumentType
-        fields = (
-            "id",
-            "slug",
-            "name",
-            "match",
-            "matching_algorithm",
-            "is_insensitive",
-            "document_count",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        )
-
-
-class DeprecatedColors:
-    COLOURS = (
-        (1, "#a6cee3"),
-        (2, "#1f78b4"),
-        (3, "#b2df8a"),
-        (4, "#33a02c"),
-        (5, "#fb9a99"),
-        (6, "#e31a1c"),
-        (7, "#fdbf6f"),
-        (8, "#ff7f00"),
-        (9, "#cab2d6"),
-        (10, "#6a3d9a"),
-        (11, "#b15928"),
-        (12, "#000000"),
-        (13, "#cccccc"),
-    )
-
-
-@extend_schema_field(
-    serializers.ChoiceField(
-        choices=DeprecatedColors.COLOURS,
-    ),
-)
-class ColorField(serializers.Field):
-    def to_internal_value(self, data):
-        for id, color in DeprecatedColors.COLOURS:
-            if id == data:
-                return color
-        raise serializers.ValidationError
-
-    def to_representation(self, value):
-        for id, color in DeprecatedColors.COLOURS:
-            if color == value:
-                return id
-        return 1
-
-
-class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
-    colour = ColorField(source="color", default="#a6cee3")
-
-    class Meta:
-        model = Tag
-        fields = (
-            "id",
-            "slug",
-            "name",
-            "colour",
-            "match",
-            "matching_algorithm",
-            "is_insensitive",
-            "is_inbox_tag",
-            "document_count",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        )
-
-
-class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
-    def get_text_color(self, obj) -> str:
-        try:
-            h = obj.color.lstrip("#")
-            rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
-            luminance = math.sqrt(
-                0.299 * math.pow(rgb[0], 2)
-                + 0.587 * math.pow(rgb[1], 2)
-                + 0.114 * math.pow(rgb[2], 2),
-            )
-            return "#ffffff" if luminance < 0.53 else "#000000"
-        except ValueError:
-            return "#000000"
-
-    text_color = serializers.SerializerMethodField()
-
-    class Meta:
-        model = Tag
-        fields = (
-            "id",
-            "slug",
-            "name",
-            "color",
-            "text_color",
-            "match",
-            "matching_algorithm",
-            "is_insensitive",
-            "is_inbox_tag",
-            "document_count",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        )
-
-    def validate_color(self, color):
-        regex = r"#[0-9a-fA-F]{6}"
-        if not re.match(regex, color):
-            raise serializers.ValidationError(_("Invalid color."))
-        return color
-
-
-class CorrespondentField(serializers.PrimaryKeyRelatedField):
-    def get_queryset(self):
-        return Correspondent.objects.all()
-
-
-class TagsField(serializers.PrimaryKeyRelatedField):
-    def get_queryset(self):
-        return Tag.objects.all()
-
-
-class DocumentTypeField(serializers.PrimaryKeyRelatedField):
-    def get_queryset(self):
-        return DocumentType.objects.all()
-
-
-class StoragePathField(serializers.PrimaryKeyRelatedField):
-    def get_queryset(self):
-        return StoragePath.objects.all()
-
-
-class CustomFieldSerializer(serializers.ModelSerializer):
-    def __init__(self, *args, **kwargs):
-        context = kwargs.get("context")
-        self.api_version = int(
-            context.get("request").version
-            if context and context.get("request")
-            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
-        )
-        super().__init__(*args, **kwargs)
-
-    data_type = serializers.ChoiceField(
-        choices=CustomField.FieldDataType,
-        read_only=False,
-    )
-
-    document_count = serializers.IntegerField(read_only=True)
-
-    class Meta:
-        model = CustomField
-        fields = [
-            "id",
-            "name",
-            "data_type",
-            "extra_data",
-            "document_count",
-        ]
-
-    def validate(self, attrs):
-        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
-        name = attrs.get(
-            "name",
-            self.instance.name if hasattr(self.instance, "name") else None,
-        )
-        objects = (
-            self.Meta.model.objects.exclude(
-                pk=self.instance.pk,
-            )
-            if self.instance is not None
-            else self.Meta.model.objects.all()
-        )
-        if ("name" in attrs) and objects.filter(
-            name=name,
-        ).exists():
-            raise serializers.ValidationError(
-                {"error": "Object violates name unique constraint"},
-            )
-        if (
-            "data_type" in attrs
-            and attrs["data_type"] == CustomField.FieldDataType.SELECT
-        ) or (
-            self.instance
-            and self.instance.data_type == CustomField.FieldDataType.SELECT
-        ):
-            if (
-                "extra_data" not in attrs
-                or "select_options" not in attrs["extra_data"]
-                or not isinstance(attrs["extra_data"]["select_options"], list)
-                or len(attrs["extra_data"]["select_options"]) == 0
-                or not all(
-                    len(option.get("label", "")) > 0
-                    for option in attrs["extra_data"]["select_options"]
-                )
-            ):
-                raise serializers.ValidationError(
-                    {"error": "extra_data.select_options must be a valid list"},
-                )
-            # labels are valid, generate ids if not present
-            for option in attrs["extra_data"]["select_options"]:
-                if option.get("id") is None:
-                    option["id"] = get_random_string(length=16)
-        elif (
-            "data_type" in attrs
-            and attrs["data_type"] == CustomField.FieldDataType.MONETARY
-            and "extra_data" in attrs
-            and "default_currency" in attrs["extra_data"]
-            and attrs["extra_data"]["default_currency"] is not None
-            and (
-                not isinstance(attrs["extra_data"]["default_currency"], str)
-                or (
-                    len(attrs["extra_data"]["default_currency"]) > 0
-                    and len(attrs["extra_data"]["default_currency"]) != 3
-                )
-            )
-        ):
-            raise serializers.ValidationError(
-                {"error": "extra_data.default_currency must be a 3-character string"},
-            )
-        return super().validate(attrs)
-
-    def to_internal_value(self, data):
-        ret = super().to_internal_value(data)
-
-        if (
-            self.api_version < 7
-            and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
-            and isinstance(ret.get("extra_data", {}).get("select_options"), list)
-        ):
-            ret["extra_data"]["select_options"] = [
-                {
-                    "label": option,
-                    "id": get_random_string(length=16),
-                }
-                for option in ret["extra_data"]["select_options"]
-            ]
-
-        return ret
-
-    def to_representation(self, instance):
-        ret = super().to_representation(instance)
-
-        if (
-            self.api_version < 7
-            and instance.data_type == CustomField.FieldDataType.SELECT
-        ):
-            # Convert the select options with ids to a list of strings
-            ret["extra_data"]["select_options"] = [
-                option["label"] for option in ret["extra_data"]["select_options"]
-            ]
-
-        return ret
-
-
-class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
-    """
-    Based on https://stackoverflow.com/a/62579804
-    """
-
-    def __init__(self, method_name=None, *args, **kwargs):
-        self.method_name = method_name
-        kwargs["source"] = "*"
-        super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
-
-    def to_internal_value(self, data):
-        return {self.field_name: data}
-
-
-class CustomFieldInstanceSerializer(serializers.ModelSerializer):
-    field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
-    value = ReadWriteSerializerMethodField(allow_null=True)
-
-    def create(self, validated_data):
-        # An instance is attached to a document
-        document: Document = validated_data["document"]
-        # And to a CustomField
-        custom_field: CustomField = validated_data["field"]
-        # This key must exist, as it is validated
-        data_store_name = CustomFieldInstance.get_value_field_name(
-            custom_field.data_type,
-        )
-
-        if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
-            # prior to update so we can look for any docs that are going to be removed
-            bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
-
-        # Actually update or create the instance, providing the value
-        # to fill in the correct attribute based on the type
-        instance, _ = CustomFieldInstance.objects.update_or_create(
-            document=document,
-            field=custom_field,
-            defaults={data_store_name: validated_data["value"]},
-        )
-        return instance
-
-    def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
-        return obj.value
-
-    def validate(self, data):
-        """
-        Probably because we're kind of doing it odd, validation from the model
-        doesn't run against the field "value", so we have to re-create it here.
-
-        Don't like it, but it is better than returning an HTTP 500 when the database
-        hates the value
-        """
-        data = super().validate(data)
-        field: CustomField = data["field"]
-        if "value" in data and data["value"] is not None:
-            if (
-                field.data_type == CustomField.FieldDataType.URL
-                and len(data["value"]) > 0
-            ):
-                uri_validator(data["value"])
-            elif field.data_type == CustomField.FieldDataType.INT:
-                integer_validator(data["value"])
-            elif (
-                field.data_type == CustomField.FieldDataType.MONETARY
-                and data["value"] != ""
-            ):
-                try:
-                    # First try to validate as a number from legacy format
-                    DecimalValidator(max_digits=12, decimal_places=2)(
-                        Decimal(str(data["value"])),
-                    )
-                except Exception:
-                    # If that fails, try to validate as a monetary string
-                    RegexValidator(
-                        regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
-                        message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
-                    )(data["value"])
-            elif field.data_type == CustomField.FieldDataType.STRING:
-                MaxLengthValidator(limit_value=128)(data["value"])
-            elif field.data_type == CustomField.FieldDataType.SELECT:
-                select_options = field.extra_data["select_options"]
-                try:
-                    next(
-                        option
-                        for option in select_options
-                        if option["id"] == data["value"]
-                    )
-                except Exception:
-                    raise serializers.ValidationError(
-                        f"Value must be an id of an element in {select_options}",
-                    )
-            elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
-                if not (isinstance(data["value"], list) or data["value"] is None):
-                    raise serializers.ValidationError(
-                        "Value must be a list",
-                    )
-                doc_ids = data["value"]
-                if Document.objects.filter(id__in=doc_ids).count() != len(
-                    data["value"],
-                ):
-                    raise serializers.ValidationError(
-                        "Some documents in value don't exist or were specified twice.",
-                    )
-
-        return data
-
-    def get_api_version(self):
-        return int(
-            self.context.get("request").version
-            if self.context.get("request")
-            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
-        )
-
-    def to_internal_value(self, data):
-        ret = super().to_internal_value(data)
-
-        if (
-            self.get_api_version() < 7
-            and ret.get("field").data_type == CustomField.FieldDataType.SELECT
-            and ret.get("value") is not None
-        ):
-            # Convert the index of the option in the field.extra_data["select_options"]
-            # list to the options unique id
-            ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
-                "id"
-            ]
-
-        return ret
-
-    def to_representation(self, instance):
-        ret = super().to_representation(instance)
-
-        if (
-            self.get_api_version() < 7
-            and instance.field.data_type == CustomField.FieldDataType.SELECT
-        ):
-            # return the index of the option in the field.extra_data["select_options"] list
-            ret["value"] = next(
-                (
-                    idx
-                    for idx, option in enumerate(
-                        instance.field.extra_data["select_options"],
-                    )
-                    if option["id"] == instance.value
-                ),
-                None,
-            )
-
-        return ret
-
-    class Meta:
-        model = CustomFieldInstance
-        fields = [
-            "value",
-            "field",
-        ]
-
-
-class BasicUserSerializer(serializers.ModelSerializer):
-    # Different than paperless.serializers.UserSerializer
-    class Meta:
-        model = User
-        fields = ["id", "username", "first_name", "last_name"]
-
-
-class NotesSerializer(serializers.ModelSerializer):
-    user = BasicUserSerializer(read_only=True)
-
-    class Meta:
-        model = Note
-        fields = ["id", "note", "created", "user"]
-        ordering = ["-created"]
-
-
-class DocumentSerializer(
-    OwnedObjectSerializer,
-    NestedUpdateMixin,
-    DynamicFieldsModelSerializer,
-):
-    correspondent = CorrespondentField(allow_null=True)
-    tags = TagsField(many=True)
-    document_type = DocumentTypeField(allow_null=True)
-    storage_path = StoragePathField(allow_null=True)
-
-    original_file_name = SerializerMethodField()
-    archived_file_name = SerializerMethodField()
-    created_date = serializers.DateField(required=False)
-    page_count = SerializerMethodField()
-
-    notes = NotesSerializer(many=True, required=False, read_only=True)
-
-    custom_fields = CustomFieldInstanceSerializer(
-        many=True,
-        allow_null=False,
-        required=False,
-    )
-
-    owner = serializers.PrimaryKeyRelatedField(
-        queryset=User.objects.all(),
-        required=False,
-        allow_null=True,
-    )
-
-    remove_inbox_tags = serializers.BooleanField(
-        default=False,
-        write_only=True,
-        allow_null=True,
-        required=False,
-    )
-
-    def get_page_count(self, obj) -> int | None:
-        return obj.page_count
-
-    def get_original_file_name(self, obj) -> str | None:
-        return obj.original_filename
-
-    def get_archived_file_name(self, obj) -> str | None:
-        if obj.has_archive_version:
-            return obj.get_public_filename(archive=True)
-        else:
-            return None
-
-    def to_representation(self, instance):
-        doc = super().to_representation(instance)
-        if self.truncate_content and "content" in self.fields:
-            doc["content"] = doc.get("content")[0:550]
-        return doc
-
-    def validate(self, attrs):
-        if (
-            "archive_serial_number" in attrs
-            and attrs["archive_serial_number"] is not None
-            and len(str(attrs["archive_serial_number"])) > 0
-            and Document.deleted_objects.filter(
-                archive_serial_number=attrs["archive_serial_number"],
-            ).exists()
-        ):
-            raise serializers.ValidationError(
-                {
-                    "archive_serial_number": [
-                        "Document with this Archive Serial Number already exists in the trash.",
-                    ],
-                },
-            )
-        return super().validate(attrs)
-
-    def update(self, instance: Document, validated_data):
-        if "created_date" in validated_data and "created" not in validated_data:
-            new_datetime = datetime.datetime.combine(
-                validated_data.get("created_date"),
-                datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
-            )
-            instance.created = new_datetime
-            instance.save()
-        if "created_date" in validated_data:
-            validated_data.pop("created_date")
-        if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
-            incoming_custom_fields = [
-                field["field"] for field in validated_data["custom_fields"]
-            ]
-            for custom_field_instance in instance.custom_fields.filter(
-                field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
-            ):
-                if (
-                    custom_field_instance.field not in incoming_custom_fields
-                    and custom_field_instance.value is not None
-                ):
-                    # Doc link field is being removed entirely
-                    for doc_id in custom_field_instance.value:
-                        bulk_edit.remove_doclink(
-                            instance,
-                            custom_field_instance.field,
-                            doc_id,
-                        )
-        if validated_data.get("remove_inbox_tags"):
-            tag_ids_being_added = (
-                [
-                    tag.id
-                    for tag in validated_data["tags"]
-                    if tag not in instance.tags.all()
-                ]
-                if "tags" in validated_data
-                else []
-            )
-            inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
-                id__in=tag_ids_being_added,
-            )
-            if "tags" in validated_data:
-                validated_data["tags"] = [
-                    tag
-                    for tag in validated_data["tags"]
-                    if tag not in inbox_tags_not_being_added
-                ]
-            else:
-                validated_data["tags"] = [
-                    tag
-                    for tag in instance.tags.all()
-                    if tag not in inbox_tags_not_being_added
-                ]
-        if settings.AUDIT_LOG_ENABLED:
-            with set_actor(self.user):
-                super().update(instance, validated_data)
-        else:
-            super().update(instance, validated_data)
-        # hard delete custom field instances that were soft deleted
-        CustomFieldInstance.deleted_objects.filter(document=instance).delete()
-        return instance
-
-    def __init__(self, *args, **kwargs):
-        self.truncate_content = kwargs.pop("truncate_content", False)
-
-        # return full permissions if we're doing a PATCH or PUT
-        context = kwargs.get("context")
-        if context is not None and (
-            context.get("request").method == "PATCH"
-            or context.get("request").method == "PUT"
-        ):
-            kwargs["full_perms"] = True
-
-        super().__init__(*args, **kwargs)
-
-    class Meta:
-        model = Document
-        fields = (
-            "id",
-            "correspondent",
-            "document_type",
-            "storage_path",
-            "title",
-            "content",
-            "tags",
-            "created",
-            "created_date",
-            "modified",
-            "added",
-            "deleted_at",
-            "archive_serial_number",
-            "original_file_name",
-            "archived_file_name",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "is_shared_by_requester",
-            "set_permissions",
-            "notes",
-            "custom_fields",
-            "remove_inbox_tags",
-            "page_count",
-            "mime_type",
-        )
-        list_serializer_class = OwnedObjectListSerializer
-
-
-class SearchResultListSerializer(serializers.ListSerializer):
-    def to_representation(self, hits):
-        document_ids = [hit["id"] for hit in hits]
-        # Fetch all Document objects in the list in one SQL query.
-        documents = self.child.fetch_documents(document_ids)
-        self.child.context["documents"] = documents
-        # Also check if they are shared with other users / groups.
-        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
-            documents.values(),
-        )
-
-        return super().to_representation(hits)
-
-
-class SearchResultSerializer(DocumentSerializer):
-    @staticmethod
-    def fetch_documents(ids):
-        """
-        Return a dict that maps given document IDs to Document objects.
-        """
-        return {
-            document.id: document
-            for document in Document.objects.select_related(
-                "correspondent",
-                "storage_path",
-                "document_type",
-                "owner",
-            )
-            .prefetch_related("tags", "custom_fields", "notes")
-            .filter(id__in=ids)
-        }
-
-    def to_representation(self, hit):
-        # Again we first check if the parent has already fetched the documents.
-        documents = self.context.get("documents")
-        # Otherwise we fetch this document.
-        if documents is None:  # pragma: no cover
-            # In practice we only serialize **lists** of whoosh.searching.Hit.
-            # I'm keeping this check for completeness but marking it no cover for now.
-            documents = self.fetch_documents([hit["id"]])
-        document = documents[hit["id"]]
-
-        notes = ",".join(
-            [str(c.note) for c in document.notes.all()],
-        )
-        r = super().to_representation(document)
-        r["__search_hit__"] = {
-            "score": hit.score,
-            "highlights": hit.highlights("content", text=document.content),
-            "note_highlights": (
-                hit.highlights("notes", text=notes) if document else None
-            ),
-            "rank": hit.rank,
-        }
-
-        return r
-
-    class Meta(DocumentSerializer.Meta):
-        list_serializer_class = SearchResultListSerializer
-
-
-class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
-    class Meta:
-        model = SavedViewFilterRule
-        fields = ["rule_type", "value"]
-
-
-class SavedViewSerializer(OwnedObjectSerializer):
-    filter_rules = SavedViewFilterRuleSerializer(many=True)
-
-    class Meta:
-        model = SavedView
-        fields = [
-            "id",
-            "name",
-            "show_on_dashboard",
-            "show_in_sidebar",
-            "sort_field",
-            "sort_reverse",
-            "filter_rules",
-            "page_size",
-            "display_mode",
-            "display_fields",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        ]
-
-    def validate(self, attrs):
-        attrs = super().validate(attrs)
-        if "display_fields" in attrs and attrs["display_fields"] is not None:
-            for field in attrs["display_fields"]:
-                if (
-                    SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
-                ):  # i.e. check for 'custom_field_' prefix
-                    field_id = int(re.search(r"\d+", field)[0])
-                    if not CustomField.objects.filter(id=field_id).exists():
-                        raise serializers.ValidationError(
-                            f"Invalid field: {field}",
-                        )
-                elif field not in SavedView.DisplayFields.values:
-                    raise serializers.ValidationError(
-                        f"Invalid field: {field}",
-                    )
-        return attrs
-
-    def update(self, instance, validated_data):
-        if "filter_rules" in validated_data:
-            rules_data = validated_data.pop("filter_rules")
-        else:
-            rules_data = None
-        if "user" in validated_data:
-            # backwards compatibility
-            validated_data["owner"] = validated_data.pop("user")
-        if (
-            "display_fields" in validated_data
-            and isinstance(
-                validated_data["display_fields"],
-                list,
-            )
-            and len(validated_data["display_fields"]) == 0
-        ):
-            validated_data["display_fields"] = None
-        super().update(instance, validated_data)
-        if rules_data is not None:
-            SavedViewFilterRule.objects.filter(saved_view=instance).delete()
-            for rule_data in rules_data:
-                SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
-        return instance
-
-    def create(self, validated_data):
-        rules_data = validated_data.pop("filter_rules")
-        if "user" in validated_data:
-            # backwards compatibility
-            validated_data["owner"] = validated_data.pop("user")
-        saved_view = SavedView.objects.create(**validated_data)
-        for rule_data in rules_data:
-            SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
-        return saved_view
-
-
-class DocumentListSerializer(serializers.Serializer):
-    documents = serializers.ListField(
-        required=True,
-        label="Documents",
-        write_only=True,
-        child=serializers.IntegerField(),
-    )
-
-    def _validate_document_id_list(self, documents, name="documents"):
-        if not isinstance(documents, list):
-            raise serializers.ValidationError(f"{name} must be a list")
-        if not all(isinstance(i, int) for i in documents):
-            raise serializers.ValidationError(f"{name} must be a list of integers")
-        count = Document.objects.filter(id__in=documents).count()
-        if not count == len(documents):
-            raise serializers.ValidationError(
-                f"Some documents in {name} don't exist or were specified twice.",
-            )
-
-    def validate_documents(self, documents):
-        self._validate_document_id_list(documents)
-        return documents
-
-
-class BulkEditSerializer(
-    SerializerWithPerms,
-    DocumentListSerializer,
-    SetPermissionsMixin,
-):
-    method = serializers.ChoiceField(
-        choices=[
-            "set_correspondent",
-            "set_document_type",
-            "set_storage_path",
-            "add_tag",
-            "remove_tag",
-            "modify_tags",
-            "modify_custom_fields",
-            "delete",
-            "reprocess",
-            "set_permissions",
-            "rotate",
-            "merge",
-            "split",
-            "delete_pages",
-        ],
-        label="Method",
-        write_only=True,
-    )
-
-    parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
-
-    def _validate_tag_id_list(self, tags, name="tags"):
-        if not isinstance(tags, list):
-            raise serializers.ValidationError(f"{name} must be a list")
-        if not all(isinstance(i, int) for i in tags):
-            raise serializers.ValidationError(f"{name} must be a list of integers")
-        count = Tag.objects.filter(id__in=tags).count()
-        if not count == len(tags):
-            raise serializers.ValidationError(
-                f"Some tags in {name} don't exist or were specified twice.",
-            )
-
-    def _validate_custom_field_id_list_or_dict(
-        self,
-        custom_fields,
-        name="custom_fields",
-    ):
-        ids = custom_fields
-        if isinstance(custom_fields, dict):
-            try:
-                ids = [int(i[0]) for i in custom_fields.items()]
-            except Exception as e:
-                logger.exception(f"Error validating custom fields: {e}")
-                raise serializers.ValidationError(
-                    f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
-                )
-        elif not isinstance(custom_fields, list) or not all(
-            isinstance(i, int) for i in ids
-        ):
-            raise serializers.ValidationError(
-                f"{name} must be a list of integers or a dict of id:value pairs",
-            )
-        count = CustomField.objects.filter(id__in=ids).count()
-        if not count == len(ids):
-            raise serializers.ValidationError(
-                f"Some custom fields in {name} don't exist or were specified twice.",
-            )
-
-    def validate_method(self, method):
-        if method == "set_correspondent":
-            return bulk_edit.set_correspondent
-        elif method == "set_document_type":
-            return bulk_edit.set_document_type
-        elif method == "set_storage_path":
-            return bulk_edit.set_storage_path
-        elif method == "add_tag":
-            return bulk_edit.add_tag
-        elif method == "remove_tag":
-            return bulk_edit.remove_tag
-        elif method == "modify_tags":
-            return bulk_edit.modify_tags
-        elif method == "modify_custom_fields":
-            return bulk_edit.modify_custom_fields
-        elif method == "delete":
-            return bulk_edit.delete
-        elif method == "redo_ocr" or method == "reprocess":
-            return bulk_edit.reprocess
-        elif method == "set_permissions":
-            return bulk_edit.set_permissions
-        elif method == "rotate":
-            return bulk_edit.rotate
-        elif method == "merge":
-            return bulk_edit.merge
-        elif method == "split":
-            return bulk_edit.split
-        elif method == "delete_pages":
-            return bulk_edit.delete_pages
-        else:
-            raise serializers.ValidationError("Unsupported method.")
-
-    def _validate_parameters_tags(self, parameters):
-        if "tag" in parameters:
-            tag_id = parameters["tag"]
-            try:
-                Tag.objects.get(id=tag_id)
-            except Tag.DoesNotExist:
-                raise serializers.ValidationError("Tag does not exist")
-        else:
-            raise serializers.ValidationError("tag not specified")
-
-    def _validate_parameters_document_type(self, parameters):
-        if "document_type" in parameters:
-            document_type_id = parameters["document_type"]
-            if document_type_id is None:
-                # None is ok
-                return
-            try:
-                DocumentType.objects.get(id=document_type_id)
-            except DocumentType.DoesNotExist:
-                raise serializers.ValidationError("Document type does not exist")
-        else:
-            raise serializers.ValidationError("document_type not specified")
-
-    def _validate_parameters_correspondent(self, parameters):
-        if "correspondent" in parameters:
-            correspondent_id = parameters["correspondent"]
-            if correspondent_id is None:
-                return
-            try:
-                Correspondent.objects.get(id=correspondent_id)
-            except Correspondent.DoesNotExist:
-                raise serializers.ValidationError("Correspondent does not exist")
-        else:
-            raise serializers.ValidationError("correspondent not specified")
-
-    def _validate_storage_path(self, parameters):
-        if "storage_path" in parameters:
-            storage_path_id = parameters["storage_path"]
-            if storage_path_id is None:
-                return
-            try:
-                StoragePath.objects.get(id=storage_path_id)
-            except StoragePath.DoesNotExist:
-                raise serializers.ValidationError(
-                    "Storage path does not exist",
-                )
-        else:
-            raise serializers.ValidationError("storage path not specified")
-
-    def _validate_parameters_modify_tags(self, parameters):
-        if "add_tags" in parameters:
-            self._validate_tag_id_list(parameters["add_tags"], "add_tags")
-        else:
-            raise serializers.ValidationError("add_tags not specified")
-
-        if "remove_tags" in parameters:
-            self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
-        else:
-            raise serializers.ValidationError("remove_tags not specified")
-
-    def _validate_parameters_modify_custom_fields(self, parameters):
-        if "add_custom_fields" in parameters:
-            self._validate_custom_field_id_list_or_dict(
-                parameters["add_custom_fields"],
-                "add_custom_fields",
-            )
-        else:
-            raise serializers.ValidationError("add_custom_fields not specified")
-
-        if "remove_custom_fields" in parameters:
-            self._validate_custom_field_id_list_or_dict(
-                parameters["remove_custom_fields"],
-                "remove_custom_fields",
-            )
-        else:
-            raise serializers.ValidationError("remove_custom_fields not specified")
-
-    def _validate_owner(self, owner):
-        ownerUser = User.objects.get(pk=owner)
-        if ownerUser is None:
-            raise serializers.ValidationError("Specified owner cannot be found")
-        return ownerUser
-
-    def _validate_parameters_set_permissions(self, parameters):
-        parameters["set_permissions"] = self.validate_set_permissions(
-            parameters["set_permissions"],
-        )
-        if "owner" in parameters and parameters["owner"] is not None:
-            self._validate_owner(parameters["owner"])
-        if "merge" not in parameters:
-            parameters["merge"] = False
-
-    def _validate_parameters_rotate(self, parameters):
-        try:
-            if (
-                "degrees" not in parameters
-                or not float(parameters["degrees"]).is_integer()
-            ):
-                raise serializers.ValidationError("invalid rotation degrees")
-        except ValueError:
-            raise serializers.ValidationError("invalid rotation degrees")
-
-    def _validate_parameters_split(self, parameters):
-        if "pages" not in parameters:
-            raise serializers.ValidationError("pages not specified")
-        try:
-            pages = []
-            docs = parameters["pages"].split(",")
-            for doc in docs:
-                if "-" in doc:
-                    pages.append(
-                        [
-                            x
-                            for x in range(
-                                int(doc.split("-")[0]),
-                                int(doc.split("-")[1]) + 1,
-                            )
-                        ],
-                    )
-                else:
-                    pages.append([int(doc)])
-            parameters["pages"] = pages
-        except ValueError:
-            raise serializers.ValidationError("invalid pages specified")
-
-        if "delete_originals" in parameters:
-            if not isinstance(parameters["delete_originals"], bool):
-                raise serializers.ValidationError("delete_originals must be a boolean")
-        else:
-            parameters["delete_originals"] = False
-
-    def _validate_parameters_delete_pages(self, parameters):
-        if "pages" not in parameters:
-            raise serializers.ValidationError("pages not specified")
-        if not isinstance(parameters["pages"], list):
-            raise serializers.ValidationError("pages must be a list")
-        if not all(isinstance(i, int) for i in parameters["pages"]):
-            raise serializers.ValidationError("pages must be a list of integers")
-
-    def _validate_parameters_merge(self, parameters):
-        if "delete_originals" in parameters:
-            if not isinstance(parameters["delete_originals"], bool):
-                raise serializers.ValidationError("delete_originals must be a boolean")
-        else:
-            parameters["delete_originals"] = False
-        if "archive_fallback" in parameters:
-            if not isinstance(parameters["archive_fallback"], bool):
-                raise serializers.ValidationError("archive_fallback must be a boolean")
-        else:
-            parameters["archive_fallback"] = False
-
-    def validate(self, attrs):
-        method = attrs["method"]
-        parameters = attrs["parameters"]
-
-        if method == bulk_edit.set_correspondent:
-            self._validate_parameters_correspondent(parameters)
-        elif method == bulk_edit.set_document_type:
-            self._validate_parameters_document_type(parameters)
-        elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
-            self._validate_parameters_tags(parameters)
-        elif method == bulk_edit.modify_tags:
-            self._validate_parameters_modify_tags(parameters)
-        elif method == bulk_edit.set_storage_path:
-            self._validate_storage_path(parameters)
-        elif method == bulk_edit.modify_custom_fields:
-            self._validate_parameters_modify_custom_fields(parameters)
-        elif method == bulk_edit.set_permissions:
-            self._validate_parameters_set_permissions(parameters)
-        elif method == bulk_edit.rotate:
-            self._validate_parameters_rotate(parameters)
-        elif method == bulk_edit.split:
-            if len(attrs["documents"]) > 1:
-                raise serializers.ValidationError(
-                    "Split method only supports one document",
-                )
-            self._validate_parameters_split(parameters)
-        elif method == bulk_edit.delete_pages:
-            if len(attrs["documents"]) > 1:
-                raise serializers.ValidationError(
-                    "Delete pages method only supports one document",
-                )
-            self._validate_parameters_delete_pages(parameters)
-        elif method == bulk_edit.merge:
-            self._validate_parameters_merge(parameters)
-
-        return attrs
-
-
-class PostDocumentSerializer(serializers.Serializer):
-    created = serializers.DateTimeField(
-        label="Created",
-        allow_null=True,
-        write_only=True,
-        required=False,
-    )
-
-    document = serializers.FileField(
-        label="Document",
-        write_only=True,
-    )
-
-    title = serializers.CharField(
-        label="Title",
-        write_only=True,
-        required=False,
-    )
-
-    correspondent = serializers.PrimaryKeyRelatedField(
-        queryset=Correspondent.objects.all(),
-        label="Correspondent",
-        allow_null=True,
-        write_only=True,
-        required=False,
-    )
-
-    document_type = serializers.PrimaryKeyRelatedField(
-        queryset=DocumentType.objects.all(),
-        label="Document type",
-        allow_null=True,
-        write_only=True,
-        required=False,
-    )
-
-    storage_path = serializers.PrimaryKeyRelatedField(
-        queryset=StoragePath.objects.all(),
-        label="Storage path",
-        allow_null=True,
-        write_only=True,
-        required=False,
-    )
-
-    tags = serializers.PrimaryKeyRelatedField(
-        many=True,
-        queryset=Tag.objects.all(),
-        label="Tags",
-        write_only=True,
-        required=False,
-    )
-
-    archive_serial_number = serializers.IntegerField(
-        label="ASN",
-        write_only=True,
-        required=False,
-        min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
-        max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
-    )
-
-    custom_fields = serializers.PrimaryKeyRelatedField(
-        many=True,
-        queryset=CustomField.objects.all(),
-        label="Custom fields",
-        write_only=True,
-        required=False,
-    )
-
-    from_webui = serializers.BooleanField(
-        label="Documents are from Paperless-ngx WebUI",
-        write_only=True,
-        required=False,
-    )
-
-    def validate_document(self, document):
-        document_data = document.file.read()
-        mime_type = magic.from_buffer(document_data, mime=True)
-
-        if not is_mime_type_supported(mime_type):
-            if (
-                mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
-                and document.name.endswith(
-                    ".pdf",
-                )
-            ):
-                # If the file is an invalid PDF, we can try to recover it later in the consumer
-                mime_type = "application/pdf"
-            else:
-                raise serializers.ValidationError(
-                    _("File type %(type)s not supported") % {"type": mime_type},
-                )
-
-        return document.name, document_data
-
-    def validate_correspondent(self, correspondent):
-        if correspondent:
-            return correspondent.id
-        else:
-            return None
-
-    def validate_document_type(self, document_type):
-        if document_type:
-            return document_type.id
-        else:
-            return None
-
-    def validate_storage_path(self, storage_path):
-        if storage_path:
-            return storage_path.id
-        else:
-            return None
-
-    def validate_tags(self, tags):
-        if tags:
-            return [tag.id for tag in tags]
-        else:
-            return None
-
-    def validate_custom_fields(self, custom_fields):
-        if custom_fields:
-            return [custom_field.id for custom_field in custom_fields]
-        else:
-            return None
-
-
-class BulkDownloadSerializer(DocumentListSerializer):
-    content = serializers.ChoiceField(
-        choices=["archive", "originals", "both"],
-        default="archive",
-    )
-
-    compression = serializers.ChoiceField(
-        choices=["none", "deflated", "bzip2", "lzma"],
-        default="none",
-    )
-
-    follow_formatting = serializers.BooleanField(
-        default=False,
-    )
-
-    def validate_compression(self, compression):
-        import zipfile
-
-        return {
-            "none": zipfile.ZIP_STORED,
-            "deflated": zipfile.ZIP_DEFLATED,
-            "bzip2": zipfile.ZIP_BZIP2,
-            "lzma": zipfile.ZIP_LZMA,
-        }[compression]
-
-
-class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
-    class Meta:
-        model = StoragePath
-        fields = (
-            "id",
-            "slug",
-            "name",
-            "path",
-            "match",
-            "matching_algorithm",
-            "is_insensitive",
-            "document_count",
-            "owner",
-            "permissions",
-            "user_can_change",
-            "set_permissions",
-        )
-
-    def validate_path(self, path: str):
-        converted_path = convert_format_str_to_template_format(path)
-        if converted_path != path:
-            logger.warning(
-                f"Storage path {path} is not using the new style format, consider updating",
-            )
-        result = validate_filepath_template_and_render(converted_path)
-
-        if result is None:
-            raise serializers.ValidationError(_("Invalid variable detected."))
-
-        return converted_path
-
-    def update(self, instance, validated_data):
-        """
-        When a storage path is updated, see if documents
-        using it require a rename/move
-        """
-        doc_ids = [doc.id for doc in instance.documents.all()]
-        if len(doc_ids):
-            bulk_edit.bulk_update_documents.delay(doc_ids)
-
-        return super().update(instance, validated_data)
-
-
-class UiSettingsViewSerializer(serializers.ModelSerializer):
-    class Meta:
-        model = UiSettings
-        depth = 1
-        fields = [
-            "id",
-            "settings",
-        ]
-
-    def validate_settings(self, settings):
-        # we never save update checking backend setting
-        if "update_checking" in settings:
-            try:
-                settings["update_checking"].pop("backend_setting")
-            except KeyError:
-                pass
-        return settings
-
-    def create(self, validated_data):
-        ui_settings = UiSettings.objects.update_or_create(
-            user=validated_data.get("user"),
-            defaults={"settings": validated_data.get("settings", None)},
-        )
-        return ui_settings
-
-
-class TasksViewSerializer(OwnedObjectSerializer):
-    class Meta:
-        model = PaperlessTask
-        fields = (
-            "id",
-            "task_id",
-            "task_name",
-            "task_file_name",
-            "date_created",
-            "date_done",
-            "type",
-            "status",
-            "result",
-            "acknowledged",
-            "related_document",
-            "owner",
-        )
-
-    related_document = serializers.SerializerMethodField()
-    created_doc_re = re.compile(r"New document id (\d+) created")
-    duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
-
-    def get_related_document(self, obj) -> str | None:
-        result = None
-        re = None
-        if obj.result:
-            match obj.status:
-                case states.SUCCESS:
-                    re = self.created_doc_re
-                case states.FAILURE:
-                    re = (
-                        self.duplicate_doc_re
-                        if "existing document is in the trash" not in obj.result
-                        else None
-                    )
-            if re is not None:
-                try:
-                    result = re.search(obj.result).group(1)
-                except Exception:
-                    pass
-
-        return result
-
-
-class RunTaskViewSerializer(serializers.Serializer):
-    task_name = serializers.ChoiceField(
-        choices=PaperlessTask.TaskName.choices,
-        label="Task Name",
-        write_only=True,
-    )
-
-
-class AcknowledgeTasksViewSerializer(serializers.Serializer):
-    tasks = serializers.ListField(
-        required=True,
-        label="Tasks",
-        write_only=True,
-        child=serializers.IntegerField(),
-    )
-
-    def _validate_task_id_list(self, tasks, name="tasks"):
-        if not isinstance(tasks, list):
-            raise serializers.ValidationError(f"{name} must be a list")
-        if not all(isinstance(i, int) for i in tasks):
-            raise serializers.ValidationError(f"{name} must be a list of integers")
-        count = PaperlessTask.objects.filter(id__in=tasks).count()
-        if not count == len(tasks):
-            raise serializers.ValidationError(
-                f"Some tasks in {name} don't exist or were specified twice.",
-            )
-
-    def validate_tasks(self, tasks):
-        self._validate_task_id_list(tasks)
-        return tasks
-
-
-class ShareLinkSerializer(OwnedObjectSerializer):
-    class Meta:
-        model = ShareLink
-        fields = (
-            "id",
-            "created",
-            "expiration",
-            "slug",
-            "document",
-            "file_version",
-        )
-
-    def create(self, validated_data):
-        validated_data["slug"] = get_random_string(50)
-        return super().create(validated_data)
-
-
-class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
-    objects = serializers.ListField(
-        required=True,
-        allow_empty=False,
-        label="Objects",
-        write_only=True,
-        child=serializers.IntegerField(),
-    )
-
-    object_type = serializers.ChoiceField(
-        choices=[
-            "tags",
-            "correspondents",
-            "document_types",
-            "storage_paths",
-        ],
-        label="Object Type",
-        write_only=True,
-    )
-
-    operation = serializers.ChoiceField(
-        choices=[
-            "set_permissions",
-            "delete",
-        ],
-        label="Operation",
-        required=True,
-        write_only=True,
-    )
-
-    owner = serializers.PrimaryKeyRelatedField(
-        queryset=User.objects.all(),
-        required=False,
-        allow_null=True,
-    )
-
-    permissions = serializers.DictField(
-        label="Set permissions",
-        allow_empty=False,
-        required=False,
-        write_only=True,
-    )
-
-    merge = serializers.BooleanField(
-        default=False,
-        write_only=True,
-        required=False,
-    )
-
-    def get_object_class(self, object_type):
-        object_class = None
-        if object_type == "tags":
-            object_class = Tag
-        elif object_type == "correspondents":
-            object_class = Correspondent
-        elif object_type == "document_types":
-            object_class = DocumentType
-        elif object_type == "storage_paths":
-            object_class = StoragePath
-        return object_class
-
-    def _validate_objects(self, objects, object_type):
-        if not isinstance(objects, list):
-            raise serializers.ValidationError("objects must be a list")
-        if not all(isinstance(i, int) for i in objects):
-            raise serializers.ValidationError("objects must be a list of integers")
-        object_class = self.get_object_class(object_type)
-        count = object_class.objects.filter(id__in=objects).count()
-        if not count == len(objects):
-            raise serializers.ValidationError(
-                "Some ids in objects don't exist or were specified twice.",
-            )
-        return objects
-
-    def _validate_permissions(self, permissions):
-        self.validate_set_permissions(
-            permissions,
-        )
-
-    def validate(self, attrs):
-        object_type = attrs["object_type"]
-        objects = attrs["objects"]
-        operation = attrs.get("operation")
-
-        self._validate_objects(objects, object_type)
-
-        if operation == "set_permissions":
-            permissions = attrs.get("permissions")
-            if permissions is not None:
-                self._validate_permissions(permissions)
-
-        return attrs
-
-
-class WorkflowTriggerSerializer(serializers.ModelSerializer):
-    id = serializers.IntegerField(required=False, allow_null=True)
-    sources = fields.MultipleChoiceField(
-        choices=WorkflowTrigger.DocumentSourceChoices.choices,
-        allow_empty=True,
-        default={
-            DocumentSource.ConsumeFolder,
-            DocumentSource.ApiUpload,
-            DocumentSource.MailFetch,
-        },
-    )
-
-    type = serializers.ChoiceField(
-        choices=WorkflowTrigger.WorkflowTriggerType.choices,
-        label="Trigger Type",
-    )
-
-    class Meta:
-        model = WorkflowTrigger
-        fields = [
-            "id",
-            "sources",
-            "type",
-            "filter_path",
-            "filter_filename",
-            "filter_mailrule",
-            "matching_algorithm",
-            "match",
-            "is_insensitive",
-            "filter_has_tags",
-            "filter_has_correspondent",
-            "filter_has_document_type",
-            "schedule_offset_days",
-            "schedule_is_recurring",
-            "schedule_recurring_interval_days",
-            "schedule_date_field",
-            "schedule_date_custom_field",
-        ]
-
-    def validate(self, attrs):
-        # Empty strings treated as None to avoid unexpected behavior
-        if (
-            "filter_filename" in attrs
-            and attrs["filter_filename"] is not None
-            and len(attrs["filter_filename"]) == 0
-        ):
-            attrs["filter_filename"] = None
-        if (
-            "filter_path" in attrs
-            and attrs["filter_path"] is not None
-            and len(attrs["filter_path"]) == 0
-        ):
-            attrs["filter_path"] = None
-
-        if (
-            attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
-            and "filter_mailrule" not in attrs
-            and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
-            and ("filter_path" not in attrs or attrs["filter_path"] is None)
-        ):
-            raise serializers.ValidationError(
-                "File name, path or mail rule filter are required",
-            )
-
-        return attrs
-
-
-class WorkflowActionEmailSerializer(serializers.ModelSerializer):
-    id = serializers.IntegerField(allow_null=True, required=False)
-
-    class Meta:
-        model = WorkflowActionEmail
-        fields = [
-            "id",
-            "subject",
-            "body",
-            "to",
-            "include_document",
-        ]
-
-
-class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
-    id = serializers.IntegerField(allow_null=True, required=False)
-
-    def validate_url(self, url):
-        url_validator(url)
-        return url
-
-    class Meta:
-        model = WorkflowActionWebhook
-        fields = [
-            "id",
-            "url",
-            "use_params",
-            "as_json",
-            "params",
-            "body",
-            "headers",
-            "include_document",
-        ]
-
-
-class WorkflowActionSerializer(serializers.ModelSerializer):
-    id = serializers.IntegerField(required=False, allow_null=True)
-    assign_correspondent = CorrespondentField(allow_null=True, required=False)
-    assign_tags = TagsField(many=True, allow_null=True, required=False)
-    assign_document_type = DocumentTypeField(allow_null=True, required=False)
-    assign_storage_path = StoragePathField(allow_null=True, required=False)
-    email = WorkflowActionEmailSerializer(allow_null=True, required=False)
-    webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
-
-    class Meta:
-        model = WorkflowAction
-        fields = [
-            "id",
-            "type",
-            "assign_title",
-            "assign_tags",
-            "assign_correspondent",
-            "assign_document_type",
-            "assign_storage_path",
-            "assign_owner",
-            "assign_view_users",
-            "assign_view_groups",
-            "assign_change_users",
-            "assign_change_groups",
-            "assign_custom_fields",
-            "assign_custom_fields_values",
-            "remove_all_tags",
-            "remove_tags",
-            "remove_all_correspondents",
-            "remove_correspondents",
-            "remove_all_document_types",
-            "remove_document_types",
-            "remove_all_storage_paths",
-            "remove_storage_paths",
-            "remove_custom_fields",
-            "remove_all_custom_fields",
-            "remove_all_owners",
-            "remove_owners",
-            "remove_all_permissions",
-            "remove_view_users",
-            "remove_view_groups",
-            "remove_change_users",
-            "remove_change_groups",
-            "email",
-            "webhook",
-        ]
-
-    def validate(self, attrs):
-        if "assign_title" in attrs and attrs["assign_title"] is not None:
-            if len(attrs["assign_title"]) == 0:
-                # Empty strings treated as None to avoid unexpected behavior
-                attrs["assign_title"] = None
-            else:
-                try:
-                    # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
-                    attrs["assign_title"].format(
-                        correspondent="",
-                        document_type="",
-                        added="",
-                        added_year="",
-                        added_year_short="",
-                        added_month="",
-                        added_month_name="",
-                        added_month_name_short="",
-                        added_day="",
-                        added_time="",
-                        owner_username="",
-                        original_filename="",
-                        filename="",
-                        created="",
-                        created_year="",
-                        created_year_short="",
-                        created_month="",
-                        created_month_name="",
-                        created_month_name_short="",
-                        created_day="",
-                        created_time="",
-                    )
-                except (ValueError, KeyError) as e:
-                    raise serializers.ValidationError(
-                        {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
-                    )
-
-        if (
-            "type" in attrs
-            and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
-            and "email" not in attrs
-        ):
-            raise serializers.ValidationError(
-                "Email data is required for email actions",
-            )
-
-        if (
-            "type" in attrs
-            and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
-            and "webhook" not in attrs
-        ):
-            raise serializers.ValidationError(
-                "Webhook data is required for webhook actions",
-            )
-
-        return attrs
-
-
-class WorkflowSerializer(serializers.ModelSerializer):
-    order = serializers.IntegerField(required=False)
-
-    triggers = WorkflowTriggerSerializer(many=True)
-    actions = WorkflowActionSerializer(many=True)
-
-    class Meta:
-        model = Workflow
-        fields = [
-            "id",
-            "name",
-            "order",
-            "enabled",
-            "triggers",
-            "actions",
-        ]
-
-    def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
-        set_triggers = []
-        set_actions = []
-
-        if triggers is not None:
-            for trigger in triggers:
-                filter_has_tags = trigger.pop("filter_has_tags", None)
-                trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
-                    id=trigger.get("id"),
-                    defaults=trigger,
-                )
-                if filter_has_tags is not None:
-                    trigger_instance.filter_has_tags.set(filter_has_tags)
-                set_triggers.append(trigger_instance)
-
-        if actions is not None:
-            for action in actions:
-                assign_tags = action.pop("assign_tags", None)
-                assign_view_users = action.pop("assign_view_users", None)
-                assign_view_groups = action.pop("assign_view_groups", None)
-                assign_change_users = action.pop("assign_change_users", None)
-                assign_change_groups = action.pop("assign_change_groups", None)
-                assign_custom_fields = action.pop("assign_custom_fields", None)
-                remove_tags = action.pop("remove_tags", None)
-                remove_correspondents = action.pop("remove_correspondents", None)
-                remove_document_types = action.pop("remove_document_types", None)
-                remove_storage_paths = action.pop("remove_storage_paths", None)
-                remove_custom_fields = action.pop("remove_custom_fields", None)
-                remove_owners = action.pop("remove_owners", None)
-                remove_view_users = action.pop("remove_view_users", None)
-                remove_view_groups = action.pop("remove_view_groups", None)
-                remove_change_users = action.pop("remove_change_users", None)
-                remove_change_groups = action.pop("remove_change_groups", None)
-
-                email_data = action.pop("email", None)
-                webhook_data = action.pop("webhook", None)
-
-                action_instance, _ = WorkflowAction.objects.update_or_create(
-                    id=action.get("id"),
-                    defaults=action,
-                )
-
-                if email_data is not None:
-                    serializer = WorkflowActionEmailSerializer(data=email_data)
-                    serializer.is_valid(raise_exception=True)
-                    email, _ = WorkflowActionEmail.objects.update_or_create(
-                        id=email_data.get("id"),
-                        defaults=serializer.validated_data,
-                    )
-                    action_instance.email = email
-                    action_instance.save()
-
-                if webhook_data is not None:
-                    serializer = WorkflowActionWebhookSerializer(data=webhook_data)
-                    serializer.is_valid(raise_exception=True)
-                    webhook, _ = WorkflowActionWebhook.objects.update_or_create(
-                        id=webhook_data.get("id"),
-                        defaults=serializer.validated_data,
-                    )
-                    action_instance.webhook = webhook
-                    action_instance.save()
-
-                if assign_tags is not None:
-                    action_instance.assign_tags.set(assign_tags)
-                if assign_view_users is not None:
-                    action_instance.assign_view_users.set(assign_view_users)
-                if assign_view_groups is not None:
-                    action_instance.assign_view_groups.set(assign_view_groups)
-                if assign_change_users is not None:
-                    action_instance.assign_change_users.set(assign_change_users)
-                if assign_change_groups is not None:
-                    action_instance.assign_change_groups.set(assign_change_groups)
-                if assign_custom_fields is not None:
-                    action_instance.assign_custom_fields.set(assign_custom_fields)
-                if remove_tags is not None:
-                    action_instance.remove_tags.set(remove_tags)
-                if remove_correspondents is not None:
-                    action_instance.remove_correspondents.set(remove_correspondents)
-                if remove_document_types is not None:
-                    action_instance.remove_document_types.set(remove_document_types)
-                if remove_storage_paths is not None:
-                    action_instance.remove_storage_paths.set(remove_storage_paths)
-                if remove_custom_fields is not None:
-                    action_instance.remove_custom_fields.set(remove_custom_fields)
-                if remove_owners is not None:
-                    action_instance.remove_owners.set(remove_owners)
-                if remove_view_users is not None:
-                    action_instance.remove_view_users.set(remove_view_users)
-                if remove_view_groups is not None:
-                    action_instance.remove_view_groups.set(remove_view_groups)
-                if remove_change_users is not None:
-                    action_instance.remove_change_users.set(remove_change_users)
-                if remove_change_groups is not None:
-                    action_instance.remove_change_groups.set(remove_change_groups)
-
-                set_actions.append(action_instance)
-
-        instance.triggers.set(set_triggers)
-        instance.actions.set(set_actions)
-        instance.save()
-
-    def prune_triggers_and_actions(self):
-        """
-        ManyToMany fields dont support e.g. on_delete so we need to discard unattached
-        triggers and actionas manually
-        """
-        for trigger in WorkflowTrigger.objects.all():
-            if trigger.workflows.all().count() == 0:
-                trigger.delete()
-
-        for action in WorkflowAction.objects.all():
-            if action.workflows.all().count() == 0:
-                action.delete()
-
-        WorkflowActionEmail.objects.filter(action=None).delete()
-        WorkflowActionWebhook.objects.filter(action=None).delete()
-
-    def create(self, validated_data) -> Workflow:
-        if "triggers" in validated_data:
-            triggers = validated_data.pop("triggers")
-
-        if "actions" in validated_data:
-            actions = validated_data.pop("actions")
-
-        instance = super().create(validated_data)
-
-        self.update_triggers_and_actions(instance, triggers, actions)
-
-        return instance
-
-    def update(self, instance: Workflow, validated_data) -> Workflow:
-        if "triggers" in validated_data:
-            triggers = validated_data.pop("triggers")
-
-        if "actions" in validated_data:
-            actions = validated_data.pop("actions")
-
-        instance = super().update(instance, validated_data)
-
-        self.update_triggers_and_actions(instance, triggers, actions)
-
-        self.prune_triggers_and_actions()
-
-        return instance
-
-
-class TrashSerializer(SerializerWithPerms):
-    documents = serializers.ListField(
-        required=False,
-        label="Documents",
-        write_only=True,
-        child=serializers.IntegerField(),
-    )
-
-    action = serializers.ChoiceField(
-        choices=["restore", "empty"],
-        label="Action",
-        write_only=True,
-    )
-
-    def validate_documents(self, documents):
-        count = Document.deleted_objects.filter(id__in=documents).count()
-        if not count == len(documents):
-            raise serializers.ValidationError(
-                "Some documents in the list have not yet been deleted.",
-            )
-        return documents
-
-
-class StoragePathTestSerializer(SerializerWithPerms):
-    path = serializers.CharField(
-        required=True,
-        label="Path",
-        write_only=True,
-    )
-
-    document = serializers.PrimaryKeyRelatedField(
-        queryset=Document.objects.all(),
-        required=True,
-        label="Document",
-        write_only=True,
-    )
index d6737dcd024393d19d32726b3469ee6399a83816..c5e0ca0ce6d475a60e7d4acfd5a53895741e40a0 100644 (file)
@@ -7,10 +7,10 @@ from urllib.parse import quote
 from django.contrib.auth.models import User
 from rest_framework.test import APITestCase
 
-from documents.serialisers import DocumentSerializer
 from documents.tests.utils import DirectoriesMixin
 from paperless.models import CustomField
 from paperless.models import Document
+from paperless.serialisers import DocumentSerializer
 
 
 class DocumentWrapper:
index 461eef587230ae1ce6d0e329e76612d2631a29ef..7bad65f943a468e125e1e9ddd87bddd4c5854ff2 100644 (file)
@@ -1,19 +1,95 @@
+from __future__ import annotations
+
+import datetime
 import logging
+import math
+import re
+import zoneinfo
+from decimal import Decimal
+from typing import TYPE_CHECKING
+
+import magic
+from celery import states
+from django.conf import settings
+from django.contrib.auth.models import Group
+from django.contrib.auth.models import User
+from django.contrib.contenttypes.models import ContentType
+from django.core.validators import DecimalValidator
+from django.core.validators import MaxLengthValidator
+from django.core.validators import RegexValidator
+from django.core.validators import integer_validator
+from django.utils.crypto import get_random_string
+from django.utils.text import slugify
+from django.utils.translation import gettext as _
+from drf_spectacular.utils import extend_schema_field
+from drf_writable_nested.serializers import NestedUpdateMixin
+from guardian.core import ObjectPermissionChecker
+from guardian.shortcuts import get_users_with_perms
+from guardian.utils import get_group_obj_perms_model
+from guardian.utils import get_user_obj_perms_model
+from rest_framework import fields
+from rest_framework import serializers
+from rest_framework.fields import SerializerMethodField
+
+if settings.AUDIT_LOG_ENABLED:
+    from auditlog.context import set_actor
+
+
+from documents import bulk_edit
+from documents.parsers import is_mime_type_supported
+from documents.permissions import get_groups_with_only_permission
+from documents.permissions import set_permissions_for_object
+from documents.templating.filepath import validate_filepath_template_and_render
+from documents.templating.utils import convert_format_str_to_template_format
+from paperless.data_models import DocumentSource
+from paperless.models import Correspondent
+from paperless.models import CustomField
+from paperless.models import CustomFieldInstance
+from paperless.models import Document
+from paperless.models import DocumentType
+from paperless.models import MatchingModel
+from paperless.models import Note
+from paperless.models import PaperlessTask
+from paperless.models import SavedView
+from paperless.models import SavedViewFilterRule
+from paperless.models import ShareLink
+from paperless.models import StoragePath
+from paperless.models import Tag
+from paperless.models import UiSettings
+from paperless.models import Workflow
+from paperless.models import WorkflowAction
+from paperless.models import WorkflowActionEmail
+from paperless.models import WorkflowActionWebhook
+from paperless.models import WorkflowTrigger
+from paperless.validators import uri_validator
+from paperless.validators import url_validator
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
 
 from allauth.mfa.adapter import get_adapter as get_mfa_adapter
 from allauth.mfa.models import Authenticator
 from allauth.mfa.totp.internal.auth import TOTP
 from allauth.socialaccount.models import SocialAccount
-from django.contrib.auth.models import Group
 from django.contrib.auth.models import Permission
-from django.contrib.auth.models import User
-from rest_framework import serializers
 from rest_framework.authtoken.serializers import AuthTokenSerializer
 
 from paperless.models import ApplicationConfiguration
-from paperless_mail.serialisers import ObfuscatedPasswordField
 
-logger = logging.getLogger("paperless.settings")
+logger = logging.getLogger("paperless.serializers")
+
+
+class ObfuscatedPasswordField(serializers.CharField):
+    """
+    Sends *** string instead of password in the clear
+    """
+
+    def to_representation(self, value) -> str:
+        return "*" * max(10, len(value))
+
+    def to_internal_value(self, data):
+        return data
 
 
 class PaperlessAuthTokenSerializer(AuthTokenSerializer):
@@ -202,3 +278,2255 @@ class ApplicationConfigurationSerializer(serializers.ModelSerializer):
     class Meta:
         model = ApplicationConfiguration
         fields = "__all__"
+
+
+# https://www.django-rest-framework.org/api-guide/serializers/#example
+class DynamicFieldsModelSerializer(serializers.ModelSerializer):
+    """
+    A ModelSerializer that takes an additional `fields` argument that
+    controls which fields should be displayed.
+    """
+
+    def __init__(self, *args, **kwargs):
+        # Don't pass the 'fields' arg up to the superclass
+        fields = kwargs.pop("fields", None)
+
+        # Instantiate the superclass normally
+        super().__init__(*args, **kwargs)
+
+        if fields is not None:
+            # Drop any fields that are not specified in the `fields` argument.
+            allowed = set(fields)
+            existing = set(self.fields)
+            for field_name in existing - allowed:
+                self.fields.pop(field_name)
+
+
+class MatchingModelSerializer(serializers.ModelSerializer):
+    document_count = serializers.IntegerField(read_only=True)
+
+    def get_slug(self, obj) -> str:
+        return slugify(obj.name)
+
+    slug = SerializerMethodField()
+
+    def validate(self, data):
+        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
+        name = data.get(
+            "name",
+            self.instance.name if hasattr(self.instance, "name") else None,
+        )
+        owner = (
+            data["owner"]
+            if "owner" in data
+            else self.user
+            if hasattr(self, "user")
+            else None
+        )
+        pk = self.instance.pk if hasattr(self.instance, "pk") else None
+        if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
+            name=name,
+            owner=owner,
+        ).exclude(pk=pk).exists():
+            raise serializers.ValidationError(
+                {"error": "Object violates owner / name unique constraint"},
+            )
+        return data
+
+    def validate_match(self, match):
+        if (
+            "matching_algorithm" in self.initial_data
+            and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
+        ):
+            try:
+                re.compile(match)
+            except re.error as e:
+                raise serializers.ValidationError(
+                    _("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
+                )
+        return match
+
+
+class SetPermissionsMixin:
+    def _validate_user_ids(self, user_ids):
+        users = User.objects.none()
+        if user_ids is not None:
+            users = User.objects.filter(id__in=user_ids)
+            if not users.count() == len(user_ids):
+                raise serializers.ValidationError(
+                    "Some users in don't exist or were specified twice.",
+                )
+        return users
+
+    def _validate_group_ids(self, group_ids):
+        groups = Group.objects.none()
+        if group_ids is not None:
+            groups = Group.objects.filter(id__in=group_ids)
+            if not groups.count() == len(group_ids):
+                raise serializers.ValidationError(
+                    "Some groups in don't exist or were specified twice.",
+                )
+        return groups
+
+    def validate_set_permissions(self, set_permissions=None):
+        permissions_dict = {
+            "view": {},
+            "change": {},
+        }
+        if set_permissions is not None:
+            for action in ["view", "change"]:
+                if action in set_permissions:
+                    if "users" in set_permissions[action]:
+                        users = set_permissions[action]["users"]
+                        permissions_dict[action]["users"] = self._validate_user_ids(
+                            users,
+                        )
+                    if "groups" in set_permissions[action]:
+                        groups = set_permissions[action]["groups"]
+                        permissions_dict[action]["groups"] = self._validate_group_ids(
+                            groups,
+                        )
+                else:
+                    del permissions_dict[action]
+        return permissions_dict
+
+    def _set_permissions(self, permissions, object):
+        set_permissions_for_object(permissions, object)
+
+
+class SerializerWithPerms(serializers.Serializer):
+    def __init__(self, *args, **kwargs):
+        self.user = kwargs.pop("user", None)
+        self.full_perms = kwargs.pop("full_perms", False)
+        self.all_fields = kwargs.pop("all_fields", False)
+        super().__init__(*args, **kwargs)
+
+
+@extend_schema_field(
+    field={
+        "type": "object",
+        "properties": {
+            "view": {
+                "type": "object",
+                "properties": {
+                    "users": {
+                        "type": "array",
+                        "items": {"type": "integer"},
+                    },
+                    "groups": {
+                        "type": "array",
+                        "items": {"type": "integer"},
+                    },
+                },
+            },
+            "change": {
+                "type": "object",
+                "properties": {
+                    "users": {
+                        "type": "array",
+                        "items": {"type": "integer"},
+                    },
+                    "groups": {
+                        "type": "array",
+                        "items": {"type": "integer"},
+                    },
+                },
+            },
+        },
+    },
+)
+class SetPermissionsSerializer(serializers.DictField):
+    pass
+
+
+class OwnedObjectSerializer(
+    SerializerWithPerms,
+    serializers.ModelSerializer,
+    SetPermissionsMixin,
+):
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        if not self.all_fields:
+            try:
+                if self.full_perms:
+                    self.fields.pop("user_can_change")
+                    self.fields.pop("is_shared_by_requester")
+                else:
+                    self.fields.pop("permissions")
+            except KeyError:
+                pass
+
+    @extend_schema_field(
+        field={
+            "type": "object",
+            "properties": {
+                "view": {
+                    "type": "object",
+                    "properties": {
+                        "users": {
+                            "type": "array",
+                            "items": {"type": "integer"},
+                        },
+                        "groups": {
+                            "type": "array",
+                            "items": {"type": "integer"},
+                        },
+                    },
+                },
+                "change": {
+                    "type": "object",
+                    "properties": {
+                        "users": {
+                            "type": "array",
+                            "items": {"type": "integer"},
+                        },
+                        "groups": {
+                            "type": "array",
+                            "items": {"type": "integer"},
+                        },
+                    },
+                },
+            },
+        },
+    )
+    def get_permissions(self, obj) -> dict:
+        view_codename = f"view_{obj.__class__.__name__.lower()}"
+        change_codename = f"change_{obj.__class__.__name__.lower()}"
+
+        return {
+            "view": {
+                "users": get_users_with_perms(
+                    obj,
+                    only_with_perms_in=[view_codename],
+                    with_group_users=False,
+                ).values_list("id", flat=True),
+                "groups": get_groups_with_only_permission(
+                    obj,
+                    codename=view_codename,
+                ).values_list("id", flat=True),
+            },
+            "change": {
+                "users": get_users_with_perms(
+                    obj,
+                    only_with_perms_in=[change_codename],
+                    with_group_users=False,
+                ).values_list("id", flat=True),
+                "groups": get_groups_with_only_permission(
+                    obj,
+                    codename=change_codename,
+                ).values_list("id", flat=True),
+            },
+        }
+
+    def get_user_can_change(self, obj) -> bool:
+        checker = ObjectPermissionChecker(self.user) if self.user is not None else None
+        return (
+            obj.owner is None
+            or obj.owner == self.user
+            or (
+                self.user is not None
+                and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
+            )
+        )
+
+    @staticmethod
+    def get_shared_object_pks(objects: Iterable):
+        """
+        Return the primary keys of the subset of objects that are shared.
+        """
+        try:
+            first_obj = next(iter(objects))
+        except StopIteration:
+            return set()
+
+        ctype = ContentType.objects.get_for_model(first_obj)
+        object_pks = list(obj.pk for obj in objects)
+        pk_type = type(first_obj.pk)
+
+        def get_pks_for_permission_type(model):
+            return map(
+                pk_type,  # coerce the pk to be the same type of the provided objects
+                model.objects.filter(
+                    content_type=ctype,
+                    object_pk__in=object_pks,
+                )
+                .values_list("object_pk", flat=True)
+                .distinct(),
+            )
+
+        UserObjectPermission = get_user_obj_perms_model()
+        GroupObjectPermission = get_group_obj_perms_model()
+        user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
+        group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
+
+        return set(user_permission_pks) | set(group_permission_pks)
+
+    def get_is_shared_by_requester(self, obj: Document) -> bool:
+        # First check the context to see if `shared_object_pks` is set by the parent.
+        shared_object_pks = self.context.get("shared_object_pks")
+        # If not just check if the current object is shared.
+        if shared_object_pks is None:
+            shared_object_pks = self.get_shared_object_pks([obj])
+        return obj.owner == self.user and obj.id in shared_object_pks
+
+    permissions = SerializerMethodField(read_only=True)
+    user_can_change = SerializerMethodField(read_only=True)
+    is_shared_by_requester = SerializerMethodField(read_only=True)
+
+    set_permissions = SetPermissionsSerializer(
+        label="Set permissions",
+        allow_empty=True,
+        required=False,
+        write_only=True,
+    )
+    # other methods in mixin
+
+    def validate_unique_together(self, validated_data, instance=None):
+        # workaround for https://github.com/encode/django-rest-framework/issues/9358
+        if "owner" in validated_data and "name" in self.Meta.fields:
+            name = validated_data.get("name", instance.name if instance else None)
+            objects = (
+                self.Meta.model.objects.exclude(pk=instance.pk)
+                if instance
+                else self.Meta.model.objects.all()
+            )
+            not_unique = objects.filter(
+                owner=validated_data["owner"],
+                name=name,
+            ).exists()
+            if not_unique:
+                raise serializers.ValidationError(
+                    {"error": "Object violates owner / name unique constraint"},
+                )
+
+    def create(self, validated_data):
+        # default to current user if not set
+        request = self.context.get("request")
+        if (
+            "owner" not in validated_data
+            or (request is not None and "owner" not in request.data)
+        ) and self.user:
+            validated_data["owner"] = self.user
+        permissions = None
+        if "set_permissions" in validated_data:
+            permissions = validated_data.pop("set_permissions")
+        self.validate_unique_together(validated_data)
+        instance = super().create(validated_data)
+        if permissions is not None:
+            self._set_permissions(permissions, instance)
+        return instance
+
+    def update(self, instance, validated_data):
+        if "set_permissions" in validated_data:
+            self._set_permissions(validated_data["set_permissions"], instance)
+        self.validate_unique_together(validated_data, instance)
+        return super().update(instance, validated_data)
+
+
+class OwnedObjectListSerializer(serializers.ListSerializer):
+    def to_representation(self, documents):
+        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
+            documents,
+        )
+        return super().to_representation(documents)
+
+
+class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+    last_correspondence = serializers.DateTimeField(read_only=True, required=False)
+
+    class Meta:
+        model = Correspondent
+        fields = (
+            "id",
+            "slug",
+            "name",
+            "match",
+            "matching_algorithm",
+            "is_insensitive",
+            "document_count",
+            "last_correspondence",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        )
+
+
+class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+    class Meta:
+        model = DocumentType
+        fields = (
+            "id",
+            "slug",
+            "name",
+            "match",
+            "matching_algorithm",
+            "is_insensitive",
+            "document_count",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        )
+
+
+class DeprecatedColors:
+    COLOURS = (
+        (1, "#a6cee3"),
+        (2, "#1f78b4"),
+        (3, "#b2df8a"),
+        (4, "#33a02c"),
+        (5, "#fb9a99"),
+        (6, "#e31a1c"),
+        (7, "#fdbf6f"),
+        (8, "#ff7f00"),
+        (9, "#cab2d6"),
+        (10, "#6a3d9a"),
+        (11, "#b15928"),
+        (12, "#000000"),
+        (13, "#cccccc"),
+    )
+
+
+@extend_schema_field(
+    serializers.ChoiceField(
+        choices=DeprecatedColors.COLOURS,
+    ),
+)
+class ColorField(serializers.Field):
+    def to_internal_value(self, data):
+        for id, color in DeprecatedColors.COLOURS:
+            if id == data:
+                return color
+        raise serializers.ValidationError
+
+    def to_representation(self, value):
+        for id, color in DeprecatedColors.COLOURS:
+            if color == value:
+                return id
+        return 1
+
+
+class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
+    colour = ColorField(source="color", default="#a6cee3")
+
+    class Meta:
+        model = Tag
+        fields = (
+            "id",
+            "slug",
+            "name",
+            "colour",
+            "match",
+            "matching_algorithm",
+            "is_insensitive",
+            "is_inbox_tag",
+            "document_count",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        )
+
+
+class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+    def get_text_color(self, obj) -> str:
+        try:
+            h = obj.color.lstrip("#")
+            rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
+            luminance = math.sqrt(
+                0.299 * math.pow(rgb[0], 2)
+                + 0.587 * math.pow(rgb[1], 2)
+                + 0.114 * math.pow(rgb[2], 2),
+            )
+            return "#ffffff" if luminance < 0.53 else "#000000"
+        except ValueError:
+            return "#000000"
+
+    text_color = serializers.SerializerMethodField()
+
+    class Meta:
+        model = Tag
+        fields = (
+            "id",
+            "slug",
+            "name",
+            "color",
+            "text_color",
+            "match",
+            "matching_algorithm",
+            "is_insensitive",
+            "is_inbox_tag",
+            "document_count",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        )
+
+    def validate_color(self, color):
+        regex = r"#[0-9a-fA-F]{6}"
+        if not re.match(regex, color):
+            raise serializers.ValidationError(_("Invalid color."))
+        return color
+
+
+class CorrespondentField(serializers.PrimaryKeyRelatedField):
+    def get_queryset(self):
+        return Correspondent.objects.all()
+
+
+class TagsField(serializers.PrimaryKeyRelatedField):
+    def get_queryset(self):
+        return Tag.objects.all()
+
+
+class DocumentTypeField(serializers.PrimaryKeyRelatedField):
+    def get_queryset(self):
+        return DocumentType.objects.all()
+
+
+class StoragePathField(serializers.PrimaryKeyRelatedField):
+    def get_queryset(self):
+        return StoragePath.objects.all()
+
+
+class CustomFieldSerializer(serializers.ModelSerializer):
+    def __init__(self, *args, **kwargs):
+        context = kwargs.get("context")
+        self.api_version = int(
+            context.get("request").version
+            if context and context.get("request")
+            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
+        )
+        super().__init__(*args, **kwargs)
+
+    data_type = serializers.ChoiceField(
+        choices=CustomField.FieldDataType,
+        read_only=False,
+    )
+
+    document_count = serializers.IntegerField(read_only=True)
+
+    class Meta:
+        model = CustomField
+        fields = [
+            "id",
+            "name",
+            "data_type",
+            "extra_data",
+            "document_count",
+        ]
+
+    def validate(self, attrs):
+        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
+        name = attrs.get(
+            "name",
+            self.instance.name if hasattr(self.instance, "name") else None,
+        )
+        objects = (
+            self.Meta.model.objects.exclude(
+                pk=self.instance.pk,
+            )
+            if self.instance is not None
+            else self.Meta.model.objects.all()
+        )
+        if ("name" in attrs) and objects.filter(
+            name=name,
+        ).exists():
+            raise serializers.ValidationError(
+                {"error": "Object violates name unique constraint"},
+            )
+        if (
+            "data_type" in attrs
+            and attrs["data_type"] == CustomField.FieldDataType.SELECT
+        ) or (
+            self.instance
+            and self.instance.data_type == CustomField.FieldDataType.SELECT
+        ):
+            if (
+                "extra_data" not in attrs
+                or "select_options" not in attrs["extra_data"]
+                or not isinstance(attrs["extra_data"]["select_options"], list)
+                or len(attrs["extra_data"]["select_options"]) == 0
+                or not all(
+                    len(option.get("label", "")) > 0
+                    for option in attrs["extra_data"]["select_options"]
+                )
+            ):
+                raise serializers.ValidationError(
+                    {"error": "extra_data.select_options must be a valid list"},
+                )
+            # labels are valid, generate ids if not present
+            for option in attrs["extra_data"]["select_options"]:
+                if option.get("id") is None:
+                    option["id"] = get_random_string(length=16)
+        elif (
+            "data_type" in attrs
+            and attrs["data_type"] == CustomField.FieldDataType.MONETARY
+            and "extra_data" in attrs
+            and "default_currency" in attrs["extra_data"]
+            and attrs["extra_data"]["default_currency"] is not None
+            and (
+                not isinstance(attrs["extra_data"]["default_currency"], str)
+                or (
+                    len(attrs["extra_data"]["default_currency"]) > 0
+                    and len(attrs["extra_data"]["default_currency"]) != 3
+                )
+            )
+        ):
+            raise serializers.ValidationError(
+                {"error": "extra_data.default_currency must be a 3-character string"},
+            )
+        return super().validate(attrs)
+
+    def to_internal_value(self, data):
+        ret = super().to_internal_value(data)
+
+        if (
+            self.api_version < 7
+            and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
+            and isinstance(ret.get("extra_data", {}).get("select_options"), list)
+        ):
+            ret["extra_data"]["select_options"] = [
+                {
+                    "label": option,
+                    "id": get_random_string(length=16),
+                }
+                for option in ret["extra_data"]["select_options"]
+            ]
+
+        return ret
+
+    def to_representation(self, instance):
+        ret = super().to_representation(instance)
+
+        if (
+            self.api_version < 7
+            and instance.data_type == CustomField.FieldDataType.SELECT
+        ):
+            # Convert the select options with ids to a list of strings
+            ret["extra_data"]["select_options"] = [
+                option["label"] for option in ret["extra_data"]["select_options"]
+            ]
+
+        return ret
+
+
+class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
+    """
+    Based on https://stackoverflow.com/a/62579804
+    """
+
+    def __init__(self, method_name=None, *args, **kwargs):
+        self.method_name = method_name
+        kwargs["source"] = "*"
+        super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
+
+    def to_internal_value(self, data):
+        return {self.field_name: data}
+
+
+class CustomFieldInstanceSerializer(serializers.ModelSerializer):
+    field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
+    value = ReadWriteSerializerMethodField(allow_null=True)
+
+    def create(self, validated_data):
+        # An instance is attached to a document
+        document: Document = validated_data["document"]
+        # And to a CustomField
+        custom_field: CustomField = validated_data["field"]
+        # This key must exist, as it is validated
+        data_store_name = CustomFieldInstance.get_value_field_name(
+            custom_field.data_type,
+        )
+
+        if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
+            # prior to update so we can look for any docs that are going to be removed
+            bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
+
+        # Actually update or create the instance, providing the value
+        # to fill in the correct attribute based on the type
+        instance, _ = CustomFieldInstance.objects.update_or_create(
+            document=document,
+            field=custom_field,
+            defaults={data_store_name: validated_data["value"]},
+        )
+        return instance
+
+    def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
+        return obj.value
+
+    def validate(self, data):
+        """
+        Probably because we're kind of doing it odd, validation from the model
+        doesn't run against the field "value", so we have to re-create it here.
+
+        Don't like it, but it is better than returning an HTTP 500 when the database
+        hates the value
+        """
+        data = super().validate(data)
+        field: CustomField = data["field"]
+        if "value" in data and data["value"] is not None:
+            if (
+                field.data_type == CustomField.FieldDataType.URL
+                and len(data["value"]) > 0
+            ):
+                uri_validator(data["value"])
+            elif field.data_type == CustomField.FieldDataType.INT:
+                integer_validator(data["value"])
+            elif (
+                field.data_type == CustomField.FieldDataType.MONETARY
+                and data["value"] != ""
+            ):
+                try:
+                    # First try to validate as a number from legacy format
+                    DecimalValidator(max_digits=12, decimal_places=2)(
+                        Decimal(str(data["value"])),
+                    )
+                except Exception:
+                    # If that fails, try to validate as a monetary string
+                    RegexValidator(
+                        regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
+                        message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
+                    )(data["value"])
+            elif field.data_type == CustomField.FieldDataType.STRING:
+                MaxLengthValidator(limit_value=128)(data["value"])
+            elif field.data_type == CustomField.FieldDataType.SELECT:
+                select_options = field.extra_data["select_options"]
+                try:
+                    next(
+                        option
+                        for option in select_options
+                        if option["id"] == data["value"]
+                    )
+                except Exception:
+                    raise serializers.ValidationError(
+                        f"Value must be an id of an element in {select_options}",
+                    )
+            elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
+                if not (isinstance(data["value"], list) or data["value"] is None):
+                    raise serializers.ValidationError(
+                        "Value must be a list",
+                    )
+                doc_ids = data["value"]
+                if Document.objects.filter(id__in=doc_ids).count() != len(
+                    data["value"],
+                ):
+                    raise serializers.ValidationError(
+                        "Some documents in value don't exist or were specified twice.",
+                    )
+
+        return data
+
+    def get_api_version(self):
+        return int(
+            self.context.get("request").version
+            if self.context.get("request")
+            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
+        )
+
+    def to_internal_value(self, data):
+        ret = super().to_internal_value(data)
+
+        if (
+            self.get_api_version() < 7
+            and ret.get("field").data_type == CustomField.FieldDataType.SELECT
+            and ret.get("value") is not None
+        ):
+            # Convert the index of the option in the field.extra_data["select_options"]
+            # list to the options unique id
+            ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
+                "id"
+            ]
+
+        return ret
+
+    def to_representation(self, instance):
+        ret = super().to_representation(instance)
+
+        if (
+            self.get_api_version() < 7
+            and instance.field.data_type == CustomField.FieldDataType.SELECT
+        ):
+            # return the index of the option in the field.extra_data["select_options"] list
+            ret["value"] = next(
+                (
+                    idx
+                    for idx, option in enumerate(
+                        instance.field.extra_data["select_options"],
+                    )
+                    if option["id"] == instance.value
+                ),
+                None,
+            )
+
+        return ret
+
+    class Meta:
+        model = CustomFieldInstance
+        fields = [
+            "value",
+            "field",
+        ]
+
+
+class BasicUserSerializer(serializers.ModelSerializer):
+    # Different than paperless.serializers.UserSerializer
+    class Meta:
+        model = User
+        fields = ["id", "username", "first_name", "last_name"]
+
+
+class NotesSerializer(serializers.ModelSerializer):
+    user = BasicUserSerializer(read_only=True)
+
+    class Meta:
+        model = Note
+        fields = ["id", "note", "created", "user"]
+        ordering = ["-created"]
+
+
+class DocumentSerializer(
+    OwnedObjectSerializer,
+    NestedUpdateMixin,
+    DynamicFieldsModelSerializer,
+):
+    correspondent = CorrespondentField(allow_null=True)
+    tags = TagsField(many=True)
+    document_type = DocumentTypeField(allow_null=True)
+    storage_path = StoragePathField(allow_null=True)
+
+    original_file_name = SerializerMethodField()
+    archived_file_name = SerializerMethodField()
+    created_date = serializers.DateField(required=False)
+    page_count = SerializerMethodField()
+
+    notes = NotesSerializer(many=True, required=False, read_only=True)
+
+    custom_fields = CustomFieldInstanceSerializer(
+        many=True,
+        allow_null=False,
+        required=False,
+    )
+
+    owner = serializers.PrimaryKeyRelatedField(
+        queryset=User.objects.all(),
+        required=False,
+        allow_null=True,
+    )
+
+    remove_inbox_tags = serializers.BooleanField(
+        default=False,
+        write_only=True,
+        allow_null=True,
+        required=False,
+    )
+
+    def get_page_count(self, obj) -> int | None:
+        return obj.page_count
+
+    def get_original_file_name(self, obj) -> str | None:
+        return obj.original_filename
+
+    def get_archived_file_name(self, obj) -> str | None:
+        if obj.has_archive_version:
+            return obj.get_public_filename(archive=True)
+        else:
+            return None
+
+    def to_representation(self, instance):
+        doc = super().to_representation(instance)
+        if self.truncate_content and "content" in self.fields:
+            doc["content"] = doc.get("content")[0:550]
+        return doc
+
+    def validate(self, attrs):
+        if (
+            "archive_serial_number" in attrs
+            and attrs["archive_serial_number"] is not None
+            and len(str(attrs["archive_serial_number"])) > 0
+            and Document.deleted_objects.filter(
+                archive_serial_number=attrs["archive_serial_number"],
+            ).exists()
+        ):
+            raise serializers.ValidationError(
+                {
+                    "archive_serial_number": [
+                        "Document with this Archive Serial Number already exists in the trash.",
+                    ],
+                },
+            )
+        return super().validate(attrs)
+
+    def update(self, instance: Document, validated_data):
+        if "created_date" in validated_data and "created" not in validated_data:
+            new_datetime = datetime.datetime.combine(
+                validated_data.get("created_date"),
+                datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
+            )
+            instance.created = new_datetime
+            instance.save()
+        if "created_date" in validated_data:
+            validated_data.pop("created_date")
+        if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
+            incoming_custom_fields = [
+                field["field"] for field in validated_data["custom_fields"]
+            ]
+            for custom_field_instance in instance.custom_fields.filter(
+                field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
+            ):
+                if (
+                    custom_field_instance.field not in incoming_custom_fields
+                    and custom_field_instance.value is not None
+                ):
+                    # Doc link field is being removed entirely
+                    for doc_id in custom_field_instance.value:
+                        bulk_edit.remove_doclink(
+                            instance,
+                            custom_field_instance.field,
+                            doc_id,
+                        )
+        if validated_data.get("remove_inbox_tags"):
+            tag_ids_being_added = (
+                [
+                    tag.id
+                    for tag in validated_data["tags"]
+                    if tag not in instance.tags.all()
+                ]
+                if "tags" in validated_data
+                else []
+            )
+            inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
+                id__in=tag_ids_being_added,
+            )
+            if "tags" in validated_data:
+                validated_data["tags"] = [
+                    tag
+                    for tag in validated_data["tags"]
+                    if tag not in inbox_tags_not_being_added
+                ]
+            else:
+                validated_data["tags"] = [
+                    tag
+                    for tag in instance.tags.all()
+                    if tag not in inbox_tags_not_being_added
+                ]
+        if settings.AUDIT_LOG_ENABLED:
+            with set_actor(self.user):
+                super().update(instance, validated_data)
+        else:
+            super().update(instance, validated_data)
+        # hard delete custom field instances that were soft deleted
+        CustomFieldInstance.deleted_objects.filter(document=instance).delete()
+        return instance
+
+    def __init__(self, *args, **kwargs):
+        self.truncate_content = kwargs.pop("truncate_content", False)
+
+        # return full permissions if we're doing a PATCH or PUT
+        context = kwargs.get("context")
+        if context is not None and (
+            context.get("request").method == "PATCH"
+            or context.get("request").method == "PUT"
+        ):
+            kwargs["full_perms"] = True
+
+        super().__init__(*args, **kwargs)
+
+    class Meta:
+        model = Document
+        fields = (
+            "id",
+            "correspondent",
+            "document_type",
+            "storage_path",
+            "title",
+            "content",
+            "tags",
+            "created",
+            "created_date",
+            "modified",
+            "added",
+            "deleted_at",
+            "archive_serial_number",
+            "original_file_name",
+            "archived_file_name",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "is_shared_by_requester",
+            "set_permissions",
+            "notes",
+            "custom_fields",
+            "remove_inbox_tags",
+            "page_count",
+            "mime_type",
+        )
+        list_serializer_class = OwnedObjectListSerializer
+
+
+class SearchResultListSerializer(serializers.ListSerializer):
+    def to_representation(self, hits):
+        document_ids = [hit["id"] for hit in hits]
+        # Fetch all Document objects in the list in one SQL query.
+        documents = self.child.fetch_documents(document_ids)
+        self.child.context["documents"] = documents
+        # Also check if they are shared with other users / groups.
+        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
+            documents.values(),
+        )
+
+        return super().to_representation(hits)
+
+
+class SearchResultSerializer(DocumentSerializer):
+    @staticmethod
+    def fetch_documents(ids):
+        """
+        Return a dict that maps given document IDs to Document objects.
+        """
+        return {
+            document.id: document
+            for document in Document.objects.select_related(
+                "correspondent",
+                "storage_path",
+                "document_type",
+                "owner",
+            )
+            .prefetch_related("tags", "custom_fields", "notes")
+            .filter(id__in=ids)
+        }
+
+    def to_representation(self, hit):
+        # Again we first check if the parent has already fetched the documents.
+        documents = self.context.get("documents")
+        # Otherwise we fetch this document.
+        if documents is None:  # pragma: no cover
+            # In practice we only serialize **lists** of whoosh.searching.Hit.
+            # I'm keeping this check for completeness but marking it no cover for now.
+            documents = self.fetch_documents([hit["id"]])
+        document = documents[hit["id"]]
+
+        notes = ",".join(
+            [str(c.note) for c in document.notes.all()],
+        )
+        r = super().to_representation(document)
+        r["__search_hit__"] = {
+            "score": hit.score,
+            "highlights": hit.highlights("content", text=document.content),
+            "note_highlights": (
+                hit.highlights("notes", text=notes) if document else None
+            ),
+            "rank": hit.rank,
+        }
+
+        return r
+
+    class Meta(DocumentSerializer.Meta):
+        list_serializer_class = SearchResultListSerializer
+
+
+class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
+    class Meta:
+        model = SavedViewFilterRule
+        fields = ["rule_type", "value"]
+
+
+class SavedViewSerializer(OwnedObjectSerializer):
+    filter_rules = SavedViewFilterRuleSerializer(many=True)
+
+    class Meta:
+        model = SavedView
+        fields = [
+            "id",
+            "name",
+            "show_on_dashboard",
+            "show_in_sidebar",
+            "sort_field",
+            "sort_reverse",
+            "filter_rules",
+            "page_size",
+            "display_mode",
+            "display_fields",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        ]
+
+    def validate(self, attrs):
+        attrs = super().validate(attrs)
+        if "display_fields" in attrs and attrs["display_fields"] is not None:
+            for field in attrs["display_fields"]:
+                if (
+                    SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
+                ):  # i.e. check for 'custom_field_' prefix
+                    field_id = int(re.search(r"\d+", field)[0])
+                    if not CustomField.objects.filter(id=field_id).exists():
+                        raise serializers.ValidationError(
+                            f"Invalid field: {field}",
+                        )
+                elif field not in SavedView.DisplayFields.values:
+                    raise serializers.ValidationError(
+                        f"Invalid field: {field}",
+                    )
+        return attrs
+
+    def update(self, instance, validated_data):
+        if "filter_rules" in validated_data:
+            rules_data = validated_data.pop("filter_rules")
+        else:
+            rules_data = None
+        if "user" in validated_data:
+            # backwards compatibility
+            validated_data["owner"] = validated_data.pop("user")
+        if (
+            "display_fields" in validated_data
+            and isinstance(
+                validated_data["display_fields"],
+                list,
+            )
+            and len(validated_data["display_fields"]) == 0
+        ):
+            validated_data["display_fields"] = None
+        super().update(instance, validated_data)
+        if rules_data is not None:
+            SavedViewFilterRule.objects.filter(saved_view=instance).delete()
+            for rule_data in rules_data:
+                SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
+        return instance
+
+    def create(self, validated_data):
+        rules_data = validated_data.pop("filter_rules")
+        if "user" in validated_data:
+            # backwards compatibility
+            validated_data["owner"] = validated_data.pop("user")
+        saved_view = SavedView.objects.create(**validated_data)
+        for rule_data in rules_data:
+            SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
+        return saved_view
+
+
+class DocumentListSerializer(serializers.Serializer):
+    documents = serializers.ListField(
+        required=True,
+        label="Documents",
+        write_only=True,
+        child=serializers.IntegerField(),
+    )
+
+    def _validate_document_id_list(self, documents, name="documents"):
+        if not isinstance(documents, list):
+            raise serializers.ValidationError(f"{name} must be a list")
+        if not all(isinstance(i, int) for i in documents):
+            raise serializers.ValidationError(f"{name} must be a list of integers")
+        count = Document.objects.filter(id__in=documents).count()
+        if not count == len(documents):
+            raise serializers.ValidationError(
+                f"Some documents in {name} don't exist or were specified twice.",
+            )
+
+    def validate_documents(self, documents):
+        self._validate_document_id_list(documents)
+        return documents
+
+
+class BulkEditSerializer(
+    SerializerWithPerms,
+    DocumentListSerializer,
+    SetPermissionsMixin,
+):
+    method = serializers.ChoiceField(
+        choices=[
+            "set_correspondent",
+            "set_document_type",
+            "set_storage_path",
+            "add_tag",
+            "remove_tag",
+            "modify_tags",
+            "modify_custom_fields",
+            "delete",
+            "reprocess",
+            "set_permissions",
+            "rotate",
+            "merge",
+            "split",
+            "delete_pages",
+        ],
+        label="Method",
+        write_only=True,
+    )
+
+    parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
+
+    def _validate_tag_id_list(self, tags, name="tags"):
+        if not isinstance(tags, list):
+            raise serializers.ValidationError(f"{name} must be a list")
+        if not all(isinstance(i, int) for i in tags):
+            raise serializers.ValidationError(f"{name} must be a list of integers")
+        count = Tag.objects.filter(id__in=tags).count()
+        if not count == len(tags):
+            raise serializers.ValidationError(
+                f"Some tags in {name} don't exist or were specified twice.",
+            )
+
+    def _validate_custom_field_id_list_or_dict(
+        self,
+        custom_fields,
+        name="custom_fields",
+    ):
+        ids = custom_fields
+        if isinstance(custom_fields, dict):
+            try:
+                ids = [int(i[0]) for i in custom_fields.items()]
+            except Exception as e:
+                logger.exception(f"Error validating custom fields: {e}")
+                raise serializers.ValidationError(
+                    f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
+                )
+        elif not isinstance(custom_fields, list) or not all(
+            isinstance(i, int) for i in ids
+        ):
+            raise serializers.ValidationError(
+                f"{name} must be a list of integers or a dict of id:value pairs",
+            )
+        count = CustomField.objects.filter(id__in=ids).count()
+        if not count == len(ids):
+            raise serializers.ValidationError(
+                f"Some custom fields in {name} don't exist or were specified twice.",
+            )
+
+    def validate_method(self, method):
+        if method == "set_correspondent":
+            return bulk_edit.set_correspondent
+        elif method == "set_document_type":
+            return bulk_edit.set_document_type
+        elif method == "set_storage_path":
+            return bulk_edit.set_storage_path
+        elif method == "add_tag":
+            return bulk_edit.add_tag
+        elif method == "remove_tag":
+            return bulk_edit.remove_tag
+        elif method == "modify_tags":
+            return bulk_edit.modify_tags
+        elif method == "modify_custom_fields":
+            return bulk_edit.modify_custom_fields
+        elif method == "delete":
+            return bulk_edit.delete
+        elif method == "redo_ocr" or method == "reprocess":
+            return bulk_edit.reprocess
+        elif method == "set_permissions":
+            return bulk_edit.set_permissions
+        elif method == "rotate":
+            return bulk_edit.rotate
+        elif method == "merge":
+            return bulk_edit.merge
+        elif method == "split":
+            return bulk_edit.split
+        elif method == "delete_pages":
+            return bulk_edit.delete_pages
+        else:
+            raise serializers.ValidationError("Unsupported method.")
+
+    def _validate_parameters_tags(self, parameters):
+        if "tag" in parameters:
+            tag_id = parameters["tag"]
+            try:
+                Tag.objects.get(id=tag_id)
+            except Tag.DoesNotExist:
+                raise serializers.ValidationError("Tag does not exist")
+        else:
+            raise serializers.ValidationError("tag not specified")
+
+    def _validate_parameters_document_type(self, parameters):
+        if "document_type" in parameters:
+            document_type_id = parameters["document_type"]
+            if document_type_id is None:
+                # None is ok
+                return
+            try:
+                DocumentType.objects.get(id=document_type_id)
+            except DocumentType.DoesNotExist:
+                raise serializers.ValidationError("Document type does not exist")
+        else:
+            raise serializers.ValidationError("document_type not specified")
+
+    def _validate_parameters_correspondent(self, parameters):
+        if "correspondent" in parameters:
+            correspondent_id = parameters["correspondent"]
+            if correspondent_id is None:
+                return
+            try:
+                Correspondent.objects.get(id=correspondent_id)
+            except Correspondent.DoesNotExist:
+                raise serializers.ValidationError("Correspondent does not exist")
+        else:
+            raise serializers.ValidationError("correspondent not specified")
+
+    def _validate_storage_path(self, parameters):
+        if "storage_path" in parameters:
+            storage_path_id = parameters["storage_path"]
+            if storage_path_id is None:
+                return
+            try:
+                StoragePath.objects.get(id=storage_path_id)
+            except StoragePath.DoesNotExist:
+                raise serializers.ValidationError(
+                    "Storage path does not exist",
+                )
+        else:
+            raise serializers.ValidationError("storage path not specified")
+
+    def _validate_parameters_modify_tags(self, parameters):
+        if "add_tags" in parameters:
+            self._validate_tag_id_list(parameters["add_tags"], "add_tags")
+        else:
+            raise serializers.ValidationError("add_tags not specified")
+
+        if "remove_tags" in parameters:
+            self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
+        else:
+            raise serializers.ValidationError("remove_tags not specified")
+
+    def _validate_parameters_modify_custom_fields(self, parameters):
+        if "add_custom_fields" in parameters:
+            self._validate_custom_field_id_list_or_dict(
+                parameters["add_custom_fields"],
+                "add_custom_fields",
+            )
+        else:
+            raise serializers.ValidationError("add_custom_fields not specified")
+
+        if "remove_custom_fields" in parameters:
+            self._validate_custom_field_id_list_or_dict(
+                parameters["remove_custom_fields"],
+                "remove_custom_fields",
+            )
+        else:
+            raise serializers.ValidationError("remove_custom_fields not specified")
+
+    def _validate_owner(self, owner):
+        ownerUser = User.objects.get(pk=owner)
+        if ownerUser is None:
+            raise serializers.ValidationError("Specified owner cannot be found")
+        return ownerUser
+
+    def _validate_parameters_set_permissions(self, parameters):
+        parameters["set_permissions"] = self.validate_set_permissions(
+            parameters["set_permissions"],
+        )
+        if "owner" in parameters and parameters["owner"] is not None:
+            self._validate_owner(parameters["owner"])
+        if "merge" not in parameters:
+            parameters["merge"] = False
+
+    def _validate_parameters_rotate(self, parameters):
+        try:
+            if (
+                "degrees" not in parameters
+                or not float(parameters["degrees"]).is_integer()
+            ):
+                raise serializers.ValidationError("invalid rotation degrees")
+        except ValueError:
+            raise serializers.ValidationError("invalid rotation degrees")
+
+    def _validate_parameters_split(self, parameters):
+        if "pages" not in parameters:
+            raise serializers.ValidationError("pages not specified")
+        try:
+            pages = []
+            docs = parameters["pages"].split(",")
+            for doc in docs:
+                if "-" in doc:
+                    pages.append(
+                        [
+                            x
+                            for x in range(
+                                int(doc.split("-")[0]),
+                                int(doc.split("-")[1]) + 1,
+                            )
+                        ],
+                    )
+                else:
+                    pages.append([int(doc)])
+            parameters["pages"] = pages
+        except ValueError:
+            raise serializers.ValidationError("invalid pages specified")
+
+        if "delete_originals" in parameters:
+            if not isinstance(parameters["delete_originals"], bool):
+                raise serializers.ValidationError("delete_originals must be a boolean")
+        else:
+            parameters["delete_originals"] = False
+
+    def _validate_parameters_delete_pages(self, parameters):
+        if "pages" not in parameters:
+            raise serializers.ValidationError("pages not specified")
+        if not isinstance(parameters["pages"], list):
+            raise serializers.ValidationError("pages must be a list")
+        if not all(isinstance(i, int) for i in parameters["pages"]):
+            raise serializers.ValidationError("pages must be a list of integers")
+
+    def _validate_parameters_merge(self, parameters):
+        if "delete_originals" in parameters:
+            if not isinstance(parameters["delete_originals"], bool):
+                raise serializers.ValidationError("delete_originals must be a boolean")
+        else:
+            parameters["delete_originals"] = False
+        if "archive_fallback" in parameters:
+            if not isinstance(parameters["archive_fallback"], bool):
+                raise serializers.ValidationError("archive_fallback must be a boolean")
+        else:
+            parameters["archive_fallback"] = False
+
+    def validate(self, attrs):
+        method = attrs["method"]
+        parameters = attrs["parameters"]
+
+        if method == bulk_edit.set_correspondent:
+            self._validate_parameters_correspondent(parameters)
+        elif method == bulk_edit.set_document_type:
+            self._validate_parameters_document_type(parameters)
+        elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
+            self._validate_parameters_tags(parameters)
+        elif method == bulk_edit.modify_tags:
+            self._validate_parameters_modify_tags(parameters)
+        elif method == bulk_edit.set_storage_path:
+            self._validate_storage_path(parameters)
+        elif method == bulk_edit.modify_custom_fields:
+            self._validate_parameters_modify_custom_fields(parameters)
+        elif method == bulk_edit.set_permissions:
+            self._validate_parameters_set_permissions(parameters)
+        elif method == bulk_edit.rotate:
+            self._validate_parameters_rotate(parameters)
+        elif method == bulk_edit.split:
+            if len(attrs["documents"]) > 1:
+                raise serializers.ValidationError(
+                    "Split method only supports one document",
+                )
+            self._validate_parameters_split(parameters)
+        elif method == bulk_edit.delete_pages:
+            if len(attrs["documents"]) > 1:
+                raise serializers.ValidationError(
+                    "Delete pages method only supports one document",
+                )
+            self._validate_parameters_delete_pages(parameters)
+        elif method == bulk_edit.merge:
+            self._validate_parameters_merge(parameters)
+
+        return attrs
+
+
+class PostDocumentSerializer(serializers.Serializer):
+    created = serializers.DateTimeField(
+        label="Created",
+        allow_null=True,
+        write_only=True,
+        required=False,
+    )
+
+    document = serializers.FileField(
+        label="Document",
+        write_only=True,
+    )
+
+    title = serializers.CharField(
+        label="Title",
+        write_only=True,
+        required=False,
+    )
+
+    correspondent = serializers.PrimaryKeyRelatedField(
+        queryset=Correspondent.objects.all(),
+        label="Correspondent",
+        allow_null=True,
+        write_only=True,
+        required=False,
+    )
+
+    document_type = serializers.PrimaryKeyRelatedField(
+        queryset=DocumentType.objects.all(),
+        label="Document type",
+        allow_null=True,
+        write_only=True,
+        required=False,
+    )
+
+    storage_path = serializers.PrimaryKeyRelatedField(
+        queryset=StoragePath.objects.all(),
+        label="Storage path",
+        allow_null=True,
+        write_only=True,
+        required=False,
+    )
+
+    tags = serializers.PrimaryKeyRelatedField(
+        many=True,
+        queryset=Tag.objects.all(),
+        label="Tags",
+        write_only=True,
+        required=False,
+    )
+
+    archive_serial_number = serializers.IntegerField(
+        label="ASN",
+        write_only=True,
+        required=False,
+        min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
+        max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
+    )
+
+    custom_fields = serializers.PrimaryKeyRelatedField(
+        many=True,
+        queryset=CustomField.objects.all(),
+        label="Custom fields",
+        write_only=True,
+        required=False,
+    )
+
+    from_webui = serializers.BooleanField(
+        label="Documents are from Paperless-ngx WebUI",
+        write_only=True,
+        required=False,
+    )
+
+    def validate_document(self, document):
+        document_data = document.file.read()
+        mime_type = magic.from_buffer(document_data, mime=True)
+
+        if not is_mime_type_supported(mime_type):
+            if (
+                mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
+                and document.name.endswith(
+                    ".pdf",
+                )
+            ):
+                # If the file is an invalid PDF, we can try to recover it later in the consumer
+                mime_type = "application/pdf"
+            else:
+                raise serializers.ValidationError(
+                    _("File type %(type)s not supported") % {"type": mime_type},
+                )
+
+        return document.name, document_data
+
+    def validate_correspondent(self, correspondent):
+        if correspondent:
+            return correspondent.id
+        else:
+            return None
+
+    def validate_document_type(self, document_type):
+        if document_type:
+            return document_type.id
+        else:
+            return None
+
+    def validate_storage_path(self, storage_path):
+        if storage_path:
+            return storage_path.id
+        else:
+            return None
+
+    def validate_tags(self, tags):
+        if tags:
+            return [tag.id for tag in tags]
+        else:
+            return None
+
+    def validate_custom_fields(self, custom_fields):
+        if custom_fields:
+            return [custom_field.id for custom_field in custom_fields]
+        else:
+            return None
+
+
+class BulkDownloadSerializer(DocumentListSerializer):
+    content = serializers.ChoiceField(
+        choices=["archive", "originals", "both"],
+        default="archive",
+    )
+
+    compression = serializers.ChoiceField(
+        choices=["none", "deflated", "bzip2", "lzma"],
+        default="none",
+    )
+
+    follow_formatting = serializers.BooleanField(
+        default=False,
+    )
+
+    def validate_compression(self, compression):
+        import zipfile
+
+        return {
+            "none": zipfile.ZIP_STORED,
+            "deflated": zipfile.ZIP_DEFLATED,
+            "bzip2": zipfile.ZIP_BZIP2,
+            "lzma": zipfile.ZIP_LZMA,
+        }[compression]
+
+
+class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+    class Meta:
+        model = StoragePath
+        fields = (
+            "id",
+            "slug",
+            "name",
+            "path",
+            "match",
+            "matching_algorithm",
+            "is_insensitive",
+            "document_count",
+            "owner",
+            "permissions",
+            "user_can_change",
+            "set_permissions",
+        )
+
+    def validate_path(self, path: str):
+        converted_path = convert_format_str_to_template_format(path)
+        if converted_path != path:
+            logger.warning(
+                f"Storage path {path} is not using the new style format, consider updating",
+            )
+        result = validate_filepath_template_and_render(converted_path)
+
+        if result is None:
+            raise serializers.ValidationError(_("Invalid variable detected."))
+
+        return converted_path
+
+    def update(self, instance, validated_data):
+        """
+        When a storage path is updated, see if documents
+        using it require a rename/move
+        """
+        doc_ids = [doc.id for doc in instance.documents.all()]
+        if len(doc_ids):
+            bulk_edit.bulk_update_documents.delay(doc_ids)
+
+        return super().update(instance, validated_data)
+
+
+class UiSettingsViewSerializer(serializers.ModelSerializer):
+    class Meta:
+        model = UiSettings
+        depth = 1
+        fields = [
+            "id",
+            "settings",
+        ]
+
+    def validate_settings(self, settings):
+        # we never save update checking backend setting
+        if "update_checking" in settings:
+            try:
+                settings["update_checking"].pop("backend_setting")
+            except KeyError:
+                pass
+        return settings
+
+    def create(self, validated_data):
+        ui_settings = UiSettings.objects.update_or_create(
+            user=validated_data.get("user"),
+            defaults={"settings": validated_data.get("settings", None)},
+        )
+        return ui_settings
+
+
+class TasksViewSerializer(OwnedObjectSerializer):
+    class Meta:
+        model = PaperlessTask
+        fields = (
+            "id",
+            "task_id",
+            "task_name",
+            "task_file_name",
+            "date_created",
+            "date_done",
+            "type",
+            "status",
+            "result",
+            "acknowledged",
+            "related_document",
+            "owner",
+        )
+
+    related_document = serializers.SerializerMethodField()
+    created_doc_re = re.compile(r"New document id (\d+) created")
+    duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
+
+    def get_related_document(self, obj) -> str | None:
+        result = None
+        re = None
+        if obj.result:
+            match obj.status:
+                case states.SUCCESS:
+                    re = self.created_doc_re
+                case states.FAILURE:
+                    re = (
+                        self.duplicate_doc_re
+                        if "existing document is in the trash" not in obj.result
+                        else None
+                    )
+            if re is not None:
+                try:
+                    result = re.search(obj.result).group(1)
+                except Exception:
+                    pass
+
+        return result
+
+
+class RunTaskViewSerializer(serializers.Serializer):
+    task_name = serializers.ChoiceField(
+        choices=PaperlessTask.TaskName.choices,
+        label="Task Name",
+        write_only=True,
+    )
+
+
+class AcknowledgeTasksViewSerializer(serializers.Serializer):
+    tasks = serializers.ListField(
+        required=True,
+        label="Tasks",
+        write_only=True,
+        child=serializers.IntegerField(),
+    )
+
+    def _validate_task_id_list(self, tasks, name="tasks"):
+        if not isinstance(tasks, list):
+            raise serializers.ValidationError(f"{name} must be a list")
+        if not all(isinstance(i, int) for i in tasks):
+            raise serializers.ValidationError(f"{name} must be a list of integers")
+        count = PaperlessTask.objects.filter(id__in=tasks).count()
+        if not count == len(tasks):
+            raise serializers.ValidationError(
+                f"Some tasks in {name} don't exist or were specified twice.",
+            )
+
+    def validate_tasks(self, tasks):
+        self._validate_task_id_list(tasks)
+        return tasks
+
+
+class ShareLinkSerializer(OwnedObjectSerializer):
+    class Meta:
+        model = ShareLink
+        fields = (
+            "id",
+            "created",
+            "expiration",
+            "slug",
+            "document",
+            "file_version",
+        )
+
+    def create(self, validated_data):
+        validated_data["slug"] = get_random_string(50)
+        return super().create(validated_data)
+
+
+class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
+    objects = serializers.ListField(
+        required=True,
+        allow_empty=False,
+        label="Objects",
+        write_only=True,
+        child=serializers.IntegerField(),
+    )
+
+    object_type = serializers.ChoiceField(
+        choices=[
+            "tags",
+            "correspondents",
+            "document_types",
+            "storage_paths",
+        ],
+        label="Object Type",
+        write_only=True,
+    )
+
+    operation = serializers.ChoiceField(
+        choices=[
+            "set_permissions",
+            "delete",
+        ],
+        label="Operation",
+        required=True,
+        write_only=True,
+    )
+
+    owner = serializers.PrimaryKeyRelatedField(
+        queryset=User.objects.all(),
+        required=False,
+        allow_null=True,
+    )
+
+    permissions = serializers.DictField(
+        label="Set permissions",
+        allow_empty=False,
+        required=False,
+        write_only=True,
+    )
+
+    merge = serializers.BooleanField(
+        default=False,
+        write_only=True,
+        required=False,
+    )
+
+    def get_object_class(self, object_type):
+        object_class = None
+        if object_type == "tags":
+            object_class = Tag
+        elif object_type == "correspondents":
+            object_class = Correspondent
+        elif object_type == "document_types":
+            object_class = DocumentType
+        elif object_type == "storage_paths":
+            object_class = StoragePath
+        return object_class
+
+    def _validate_objects(self, objects, object_type):
+        if not isinstance(objects, list):
+            raise serializers.ValidationError("objects must be a list")
+        if not all(isinstance(i, int) for i in objects):
+            raise serializers.ValidationError("objects must be a list of integers")
+        object_class = self.get_object_class(object_type)
+        count = object_class.objects.filter(id__in=objects).count()
+        if not count == len(objects):
+            raise serializers.ValidationError(
+                "Some ids in objects don't exist or were specified twice.",
+            )
+        return objects
+
+    def _validate_permissions(self, permissions):
+        self.validate_set_permissions(
+            permissions,
+        )
+
+    def validate(self, attrs):
+        object_type = attrs["object_type"]
+        objects = attrs["objects"]
+        operation = attrs.get("operation")
+
+        self._validate_objects(objects, object_type)
+
+        if operation == "set_permissions":
+            permissions = attrs.get("permissions")
+            if permissions is not None:
+                self._validate_permissions(permissions)
+
+        return attrs
+
+
+class WorkflowTriggerSerializer(serializers.ModelSerializer):
+    id = serializers.IntegerField(required=False, allow_null=True)
+    sources = fields.MultipleChoiceField(
+        choices=WorkflowTrigger.DocumentSourceChoices.choices,
+        allow_empty=True,
+        default={
+            DocumentSource.ConsumeFolder,
+            DocumentSource.ApiUpload,
+            DocumentSource.MailFetch,
+        },
+    )
+
+    type = serializers.ChoiceField(
+        choices=WorkflowTrigger.WorkflowTriggerType.choices,
+        label="Trigger Type",
+    )
+
+    class Meta:
+        model = WorkflowTrigger
+        fields = [
+            "id",
+            "sources",
+            "type",
+            "filter_path",
+            "filter_filename",
+            "filter_mailrule",
+            "matching_algorithm",
+            "match",
+            "is_insensitive",
+            "filter_has_tags",
+            "filter_has_correspondent",
+            "filter_has_document_type",
+            "schedule_offset_days",
+            "schedule_is_recurring",
+            "schedule_recurring_interval_days",
+            "schedule_date_field",
+            "schedule_date_custom_field",
+        ]
+
+    def validate(self, attrs):
+        # Empty strings treated as None to avoid unexpected behavior
+        if (
+            "filter_filename" in attrs
+            and attrs["filter_filename"] is not None
+            and len(attrs["filter_filename"]) == 0
+        ):
+            attrs["filter_filename"] = None
+        if (
+            "filter_path" in attrs
+            and attrs["filter_path"] is not None
+            and len(attrs["filter_path"]) == 0
+        ):
+            attrs["filter_path"] = None
+
+        if (
+            attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
+            and "filter_mailrule" not in attrs
+            and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
+            and ("filter_path" not in attrs or attrs["filter_path"] is None)
+        ):
+            raise serializers.ValidationError(
+                "File name, path or mail rule filter are required",
+            )
+
+        return attrs
+
+
+class WorkflowActionEmailSerializer(serializers.ModelSerializer):
+    id = serializers.IntegerField(allow_null=True, required=False)
+
+    class Meta:
+        model = WorkflowActionEmail
+        fields = [
+            "id",
+            "subject",
+            "body",
+            "to",
+            "include_document",
+        ]
+
+
+class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
+    id = serializers.IntegerField(allow_null=True, required=False)
+
+    def validate_url(self, url):
+        url_validator(url)
+        return url
+
+    class Meta:
+        model = WorkflowActionWebhook
+        fields = [
+            "id",
+            "url",
+            "use_params",
+            "as_json",
+            "params",
+            "body",
+            "headers",
+            "include_document",
+        ]
+
+
+class WorkflowActionSerializer(serializers.ModelSerializer):
+    id = serializers.IntegerField(required=False, allow_null=True)
+    assign_correspondent = CorrespondentField(allow_null=True, required=False)
+    assign_tags = TagsField(many=True, allow_null=True, required=False)
+    assign_document_type = DocumentTypeField(allow_null=True, required=False)
+    assign_storage_path = StoragePathField(allow_null=True, required=False)
+    email = WorkflowActionEmailSerializer(allow_null=True, required=False)
+    webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
+
+    class Meta:
+        model = WorkflowAction
+        fields = [
+            "id",
+            "type",
+            "assign_title",
+            "assign_tags",
+            "assign_correspondent",
+            "assign_document_type",
+            "assign_storage_path",
+            "assign_owner",
+            "assign_view_users",
+            "assign_view_groups",
+            "assign_change_users",
+            "assign_change_groups",
+            "assign_custom_fields",
+            "assign_custom_fields_values",
+            "remove_all_tags",
+            "remove_tags",
+            "remove_all_correspondents",
+            "remove_correspondents",
+            "remove_all_document_types",
+            "remove_document_types",
+            "remove_all_storage_paths",
+            "remove_storage_paths",
+            "remove_custom_fields",
+            "remove_all_custom_fields",
+            "remove_all_owners",
+            "remove_owners",
+            "remove_all_permissions",
+            "remove_view_users",
+            "remove_view_groups",
+            "remove_change_users",
+            "remove_change_groups",
+            "email",
+            "webhook",
+        ]
+
+    def validate(self, attrs):
+        if "assign_title" in attrs and attrs["assign_title"] is not None:
+            if len(attrs["assign_title"]) == 0:
+                # Empty strings treated as None to avoid unexpected behavior
+                attrs["assign_title"] = None
+            else:
+                try:
+                    # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
+                    attrs["assign_title"].format(
+                        correspondent="",
+                        document_type="",
+                        added="",
+                        added_year="",
+                        added_year_short="",
+                        added_month="",
+                        added_month_name="",
+                        added_month_name_short="",
+                        added_day="",
+                        added_time="",
+                        owner_username="",
+                        original_filename="",
+                        filename="",
+                        created="",
+                        created_year="",
+                        created_year_short="",
+                        created_month="",
+                        created_month_name="",
+                        created_month_name_short="",
+                        created_day="",
+                        created_time="",
+                    )
+                except (ValueError, KeyError) as e:
+                    raise serializers.ValidationError(
+                        {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
+                    )
+
+        if (
+            "type" in attrs
+            and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
+            and "email" not in attrs
+        ):
+            raise serializers.ValidationError(
+                "Email data is required for email actions",
+            )
+
+        if (
+            "type" in attrs
+            and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
+            and "webhook" not in attrs
+        ):
+            raise serializers.ValidationError(
+                "Webhook data is required for webhook actions",
+            )
+
+        return attrs
+
+
+class WorkflowSerializer(serializers.ModelSerializer):
+    order = serializers.IntegerField(required=False)
+
+    triggers = WorkflowTriggerSerializer(many=True)
+    actions = WorkflowActionSerializer(many=True)
+
+    class Meta:
+        model = Workflow
+        fields = [
+            "id",
+            "name",
+            "order",
+            "enabled",
+            "triggers",
+            "actions",
+        ]
+
+    def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
+        set_triggers = []
+        set_actions = []
+
+        if triggers is not None:
+            for trigger in triggers:
+                filter_has_tags = trigger.pop("filter_has_tags", None)
+                trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
+                    id=trigger.get("id"),
+                    defaults=trigger,
+                )
+                if filter_has_tags is not None:
+                    trigger_instance.filter_has_tags.set(filter_has_tags)
+                set_triggers.append(trigger_instance)
+
+        if actions is not None:
+            for action in actions:
+                assign_tags = action.pop("assign_tags", None)
+                assign_view_users = action.pop("assign_view_users", None)
+                assign_view_groups = action.pop("assign_view_groups", None)
+                assign_change_users = action.pop("assign_change_users", None)
+                assign_change_groups = action.pop("assign_change_groups", None)
+                assign_custom_fields = action.pop("assign_custom_fields", None)
+                remove_tags = action.pop("remove_tags", None)
+                remove_correspondents = action.pop("remove_correspondents", None)
+                remove_document_types = action.pop("remove_document_types", None)
+                remove_storage_paths = action.pop("remove_storage_paths", None)
+                remove_custom_fields = action.pop("remove_custom_fields", None)
+                remove_owners = action.pop("remove_owners", None)
+                remove_view_users = action.pop("remove_view_users", None)
+                remove_view_groups = action.pop("remove_view_groups", None)
+                remove_change_users = action.pop("remove_change_users", None)
+                remove_change_groups = action.pop("remove_change_groups", None)
+
+                email_data = action.pop("email", None)
+                webhook_data = action.pop("webhook", None)
+
+                action_instance, _ = WorkflowAction.objects.update_or_create(
+                    id=action.get("id"),
+                    defaults=action,
+                )
+
+                if email_data is not None:
+                    serializer = WorkflowActionEmailSerializer(data=email_data)
+                    serializer.is_valid(raise_exception=True)
+                    email, _ = WorkflowActionEmail.objects.update_or_create(
+                        id=email_data.get("id"),
+                        defaults=serializer.validated_data,
+                    )
+                    action_instance.email = email
+                    action_instance.save()
+
+                if webhook_data is not None:
+                    serializer = WorkflowActionWebhookSerializer(data=webhook_data)
+                    serializer.is_valid(raise_exception=True)
+                    webhook, _ = WorkflowActionWebhook.objects.update_or_create(
+                        id=webhook_data.get("id"),
+                        defaults=serializer.validated_data,
+                    )
+                    action_instance.webhook = webhook
+                    action_instance.save()
+
+                if assign_tags is not None:
+                    action_instance.assign_tags.set(assign_tags)
+                if assign_view_users is not None:
+                    action_instance.assign_view_users.set(assign_view_users)
+                if assign_view_groups is not None:
+                    action_instance.assign_view_groups.set(assign_view_groups)
+                if assign_change_users is not None:
+                    action_instance.assign_change_users.set(assign_change_users)
+                if assign_change_groups is not None:
+                    action_instance.assign_change_groups.set(assign_change_groups)
+                if assign_custom_fields is not None:
+                    action_instance.assign_custom_fields.set(assign_custom_fields)
+                if remove_tags is not None:
+                    action_instance.remove_tags.set(remove_tags)
+                if remove_correspondents is not None:
+                    action_instance.remove_correspondents.set(remove_correspondents)
+                if remove_document_types is not None:
+                    action_instance.remove_document_types.set(remove_document_types)
+                if remove_storage_paths is not None:
+                    action_instance.remove_storage_paths.set(remove_storage_paths)
+                if remove_custom_fields is not None:
+                    action_instance.remove_custom_fields.set(remove_custom_fields)
+                if remove_owners is not None:
+                    action_instance.remove_owners.set(remove_owners)
+                if remove_view_users is not None:
+                    action_instance.remove_view_users.set(remove_view_users)
+                if remove_view_groups is not None:
+                    action_instance.remove_view_groups.set(remove_view_groups)
+                if remove_change_users is not None:
+                    action_instance.remove_change_users.set(remove_change_users)
+                if remove_change_groups is not None:
+                    action_instance.remove_change_groups.set(remove_change_groups)
+
+                set_actions.append(action_instance)
+
+        instance.triggers.set(set_triggers)
+        instance.actions.set(set_actions)
+        instance.save()
+
+    def prune_triggers_and_actions(self):
+        """
+        ManyToMany fields dont support e.g. on_delete so we need to discard unattached
+        triggers and actionas manually
+        """
+        for trigger in WorkflowTrigger.objects.all():
+            if trigger.workflows.all().count() == 0:
+                trigger.delete()
+
+        for action in WorkflowAction.objects.all():
+            if action.workflows.all().count() == 0:
+                action.delete()
+
+        WorkflowActionEmail.objects.filter(action=None).delete()
+        WorkflowActionWebhook.objects.filter(action=None).delete()
+
+    def create(self, validated_data) -> Workflow:
+        if "triggers" in validated_data:
+            triggers = validated_data.pop("triggers")
+
+        if "actions" in validated_data:
+            actions = validated_data.pop("actions")
+
+        instance = super().create(validated_data)
+
+        self.update_triggers_and_actions(instance, triggers, actions)
+
+        return instance
+
+    def update(self, instance: Workflow, validated_data) -> Workflow:
+        if "triggers" in validated_data:
+            triggers = validated_data.pop("triggers")
+
+        if "actions" in validated_data:
+            actions = validated_data.pop("actions")
+
+        instance = super().update(instance, validated_data)
+
+        self.update_triggers_and_actions(instance, triggers, actions)
+
+        self.prune_triggers_and_actions()
+
+        return instance
+
+
+class TrashSerializer(SerializerWithPerms):
+    documents = serializers.ListField(
+        required=False,
+        label="Documents",
+        write_only=True,
+        child=serializers.IntegerField(),
+    )
+
+    action = serializers.ChoiceField(
+        choices=["restore", "empty"],
+        label="Action",
+        write_only=True,
+    )
+
+    def validate_documents(self, documents):
+        count = Document.deleted_objects.filter(id__in=documents).count()
+        if not count == len(documents):
+            raise serializers.ValidationError(
+                "Some documents in the list have not yet been deleted.",
+            )
+        return documents
+
+
+class StoragePathTestSerializer(SerializerWithPerms):
+    path = serializers.CharField(
+        required=True,
+        label="Path",
+        write_only=True,
+    )
+
+    document = serializers.PrimaryKeyRelatedField(
+        queryset=Document.objects.all(),
+        required=True,
+        label="Document",
+        write_only=True,
+    )
index 2cb3b1debfba5fa06f5a557416e373152bc26abf..7d650521e3a22b05ff304a3af3fe3ba72c97458d 100644 (file)
@@ -130,30 +130,6 @@ from documents.permissions import get_objects_for_user_owner_aware
 from documents.permissions import has_perms_owner_aware
 from documents.permissions import set_permissions_for_object
 from documents.schema import generate_object_with_permissions_schema
-from documents.serialisers import AcknowledgeTasksViewSerializer
-from documents.serialisers import BulkDownloadSerializer
-from documents.serialisers import BulkEditObjectsSerializer
-from documents.serialisers import BulkEditSerializer
-from documents.serialisers import CorrespondentSerializer
-from documents.serialisers import CustomFieldSerializer
-from documents.serialisers import DocumentListSerializer
-from documents.serialisers import DocumentSerializer
-from documents.serialisers import DocumentTypeSerializer
-from documents.serialisers import PostDocumentSerializer
-from documents.serialisers import RunTaskViewSerializer
-from documents.serialisers import SavedViewSerializer
-from documents.serialisers import SearchResultSerializer
-from documents.serialisers import ShareLinkSerializer
-from documents.serialisers import StoragePathSerializer
-from documents.serialisers import StoragePathTestSerializer
-from documents.serialisers import TagSerializer
-from documents.serialisers import TagSerializerVersion1
-from documents.serialisers import TasksViewSerializer
-from documents.serialisers import TrashSerializer
-from documents.serialisers import UiSettingsViewSerializer
-from documents.serialisers import WorkflowActionSerializer
-from documents.serialisers import WorkflowSerializer
-from documents.serialisers import WorkflowTriggerSerializer
 from documents.signals import document_updated
 from documents.tasks import consume_file
 from documents.tasks import empty_trash
@@ -189,11 +165,35 @@ from paperless.models import UiSettings
 from paperless.models import Workflow
 from paperless.models import WorkflowAction
 from paperless.models import WorkflowTrigger
+from paperless.serialisers import AcknowledgeTasksViewSerializer
 from paperless.serialisers import ApplicationConfigurationSerializer
+from paperless.serialisers import BulkDownloadSerializer
+from paperless.serialisers import BulkEditObjectsSerializer
+from paperless.serialisers import BulkEditSerializer
+from paperless.serialisers import CorrespondentSerializer
+from paperless.serialisers import CustomFieldSerializer
+from paperless.serialisers import DocumentListSerializer
+from paperless.serialisers import DocumentSerializer
+from paperless.serialisers import DocumentTypeSerializer
 from paperless.serialisers import GroupSerializer
 from paperless.serialisers import PaperlessAuthTokenSerializer
+from paperless.serialisers import PostDocumentSerializer
 from paperless.serialisers import ProfileSerializer
+from paperless.serialisers import RunTaskViewSerializer
+from paperless.serialisers import SavedViewSerializer
+from paperless.serialisers import SearchResultSerializer
+from paperless.serialisers import ShareLinkSerializer
+from paperless.serialisers import StoragePathSerializer
+from paperless.serialisers import StoragePathTestSerializer
+from paperless.serialisers import TagSerializer
+from paperless.serialisers import TagSerializerVersion1
+from paperless.serialisers import TasksViewSerializer
+from paperless.serialisers import TrashSerializer
+from paperless.serialisers import UiSettingsViewSerializer
 from paperless.serialisers import UserSerializer
+from paperless.serialisers import WorkflowActionSerializer
+from paperless.serialisers import WorkflowSerializer
+from paperless.serialisers import WorkflowTriggerSerializer
 from paperless_mail.models import MailAccount
 from paperless_mail.models import MailRule
 from paperless_mail.oauth import PaperlessMailOAuth2Manager
index c7a20acbf93bd04977d63dbfa3435f410cf35691..7e32a55cbb574dd43b2532833d652f867d6d95ea 100644 (file)
@@ -1,25 +1,14 @@
 from rest_framework import serializers
 
-from documents.serialisers import CorrespondentField
-from documents.serialisers import DocumentTypeField
-from documents.serialisers import OwnedObjectSerializer
-from documents.serialisers import TagsField
+from paperless.serialisers import CorrespondentField
+from paperless.serialisers import DocumentTypeField
+from paperless.serialisers import ObfuscatedPasswordField
+from paperless.serialisers import OwnedObjectSerializer
+from paperless.serialisers import TagsField
 from paperless_mail.models import MailAccount
 from paperless_mail.models import MailRule
 
 
-class ObfuscatedPasswordField(serializers.CharField):
-    """
-    Sends *** string instead of password in the clear
-    """
-
-    def to_representation(self, value) -> str:
-        return "*" * max(10, len(value))
-
-    def to_internal_value(self, data):
-        return data
-
-
 class MailAccountSerializer(OwnedObjectSerializer):
     password = ObfuscatedPasswordField()