+++ /dev/null
-from __future__ import annotations
-
-import datetime
-import logging
-import math
-import re
-import zoneinfo
-from decimal import Decimal
-from typing import TYPE_CHECKING
-
-import magic
-from celery import states
-from django.conf import settings
-from django.contrib.auth.models import Group
-from django.contrib.auth.models import User
-from django.contrib.contenttypes.models import ContentType
-from django.core.validators import DecimalValidator
-from django.core.validators import MaxLengthValidator
-from django.core.validators import RegexValidator
-from django.core.validators import integer_validator
-from django.utils.crypto import get_random_string
-from django.utils.text import slugify
-from django.utils.translation import gettext as _
-from drf_spectacular.utils import extend_schema_field
-from drf_writable_nested.serializers import NestedUpdateMixin
-from guardian.core import ObjectPermissionChecker
-from guardian.shortcuts import get_users_with_perms
-from guardian.utils import get_group_obj_perms_model
-from guardian.utils import get_user_obj_perms_model
-from rest_framework import fields
-from rest_framework import serializers
-from rest_framework.fields import SerializerMethodField
-
-if settings.AUDIT_LOG_ENABLED:
- from auditlog.context import set_actor
-
-
-from documents import bulk_edit
-from documents.parsers import is_mime_type_supported
-from documents.permissions import get_groups_with_only_permission
-from documents.permissions import set_permissions_for_object
-from documents.templating.filepath import validate_filepath_template_and_render
-from documents.templating.utils import convert_format_str_to_template_format
-from paperless.data_models import DocumentSource
-from paperless.models import Correspondent
-from paperless.models import CustomField
-from paperless.models import CustomFieldInstance
-from paperless.models import Document
-from paperless.models import DocumentType
-from paperless.models import MatchingModel
-from paperless.models import Note
-from paperless.models import PaperlessTask
-from paperless.models import SavedView
-from paperless.models import SavedViewFilterRule
-from paperless.models import ShareLink
-from paperless.models import StoragePath
-from paperless.models import Tag
-from paperless.models import UiSettings
-from paperless.models import Workflow
-from paperless.models import WorkflowAction
-from paperless.models import WorkflowActionEmail
-from paperless.models import WorkflowActionWebhook
-from paperless.models import WorkflowTrigger
-from paperless.validators import uri_validator
-from paperless.validators import url_validator
-
-if TYPE_CHECKING:
- from collections.abc import Iterable
-
-logger = logging.getLogger("paperless.serializers")
-
-
-# https://www.django-rest-framework.org/api-guide/serializers/#example
-class DynamicFieldsModelSerializer(serializers.ModelSerializer):
- """
- A ModelSerializer that takes an additional `fields` argument that
- controls which fields should be displayed.
- """
-
- def __init__(self, *args, **kwargs):
- # Don't pass the 'fields' arg up to the superclass
- fields = kwargs.pop("fields", None)
-
- # Instantiate the superclass normally
- super().__init__(*args, **kwargs)
-
- if fields is not None:
- # Drop any fields that are not specified in the `fields` argument.
- allowed = set(fields)
- existing = set(self.fields)
- for field_name in existing - allowed:
- self.fields.pop(field_name)
-
-
-class MatchingModelSerializer(serializers.ModelSerializer):
- document_count = serializers.IntegerField(read_only=True)
-
- def get_slug(self, obj) -> str:
- return slugify(obj.name)
-
- slug = SerializerMethodField()
-
- def validate(self, data):
- # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
- name = data.get(
- "name",
- self.instance.name if hasattr(self.instance, "name") else None,
- )
- owner = (
- data["owner"]
- if "owner" in data
- else self.user
- if hasattr(self, "user")
- else None
- )
- pk = self.instance.pk if hasattr(self.instance, "pk") else None
- if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
- name=name,
- owner=owner,
- ).exclude(pk=pk).exists():
- raise serializers.ValidationError(
- {"error": "Object violates owner / name unique constraint"},
- )
- return data
-
- def validate_match(self, match):
- if (
- "matching_algorithm" in self.initial_data
- and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
- ):
- try:
- re.compile(match)
- except re.error as e:
- raise serializers.ValidationError(
- _("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
- )
- return match
-
-
-class SetPermissionsMixin:
- def _validate_user_ids(self, user_ids):
- users = User.objects.none()
- if user_ids is not None:
- users = User.objects.filter(id__in=user_ids)
- if not users.count() == len(user_ids):
- raise serializers.ValidationError(
- "Some users in don't exist or were specified twice.",
- )
- return users
-
- def _validate_group_ids(self, group_ids):
- groups = Group.objects.none()
- if group_ids is not None:
- groups = Group.objects.filter(id__in=group_ids)
- if not groups.count() == len(group_ids):
- raise serializers.ValidationError(
- "Some groups in don't exist or were specified twice.",
- )
- return groups
-
- def validate_set_permissions(self, set_permissions=None):
- permissions_dict = {
- "view": {},
- "change": {},
- }
- if set_permissions is not None:
- for action in ["view", "change"]:
- if action in set_permissions:
- if "users" in set_permissions[action]:
- users = set_permissions[action]["users"]
- permissions_dict[action]["users"] = self._validate_user_ids(
- users,
- )
- if "groups" in set_permissions[action]:
- groups = set_permissions[action]["groups"]
- permissions_dict[action]["groups"] = self._validate_group_ids(
- groups,
- )
- else:
- del permissions_dict[action]
- return permissions_dict
-
- def _set_permissions(self, permissions, object):
- set_permissions_for_object(permissions, object)
-
-
-class SerializerWithPerms(serializers.Serializer):
- def __init__(self, *args, **kwargs):
- self.user = kwargs.pop("user", None)
- self.full_perms = kwargs.pop("full_perms", False)
- self.all_fields = kwargs.pop("all_fields", False)
- super().__init__(*args, **kwargs)
-
-
-@extend_schema_field(
- field={
- "type": "object",
- "properties": {
- "view": {
- "type": "object",
- "properties": {
- "users": {
- "type": "array",
- "items": {"type": "integer"},
- },
- "groups": {
- "type": "array",
- "items": {"type": "integer"},
- },
- },
- },
- "change": {
- "type": "object",
- "properties": {
- "users": {
- "type": "array",
- "items": {"type": "integer"},
- },
- "groups": {
- "type": "array",
- "items": {"type": "integer"},
- },
- },
- },
- },
- },
-)
-class SetPermissionsSerializer(serializers.DictField):
- pass
-
-
-class OwnedObjectSerializer(
- SerializerWithPerms,
- serializers.ModelSerializer,
- SetPermissionsMixin,
-):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- if not self.all_fields:
- try:
- if self.full_perms:
- self.fields.pop("user_can_change")
- self.fields.pop("is_shared_by_requester")
- else:
- self.fields.pop("permissions")
- except KeyError:
- pass
-
- @extend_schema_field(
- field={
- "type": "object",
- "properties": {
- "view": {
- "type": "object",
- "properties": {
- "users": {
- "type": "array",
- "items": {"type": "integer"},
- },
- "groups": {
- "type": "array",
- "items": {"type": "integer"},
- },
- },
- },
- "change": {
- "type": "object",
- "properties": {
- "users": {
- "type": "array",
- "items": {"type": "integer"},
- },
- "groups": {
- "type": "array",
- "items": {"type": "integer"},
- },
- },
- },
- },
- },
- )
- def get_permissions(self, obj) -> dict:
- view_codename = f"view_{obj.__class__.__name__.lower()}"
- change_codename = f"change_{obj.__class__.__name__.lower()}"
-
- return {
- "view": {
- "users": get_users_with_perms(
- obj,
- only_with_perms_in=[view_codename],
- with_group_users=False,
- ).values_list("id", flat=True),
- "groups": get_groups_with_only_permission(
- obj,
- codename=view_codename,
- ).values_list("id", flat=True),
- },
- "change": {
- "users": get_users_with_perms(
- obj,
- only_with_perms_in=[change_codename],
- with_group_users=False,
- ).values_list("id", flat=True),
- "groups": get_groups_with_only_permission(
- obj,
- codename=change_codename,
- ).values_list("id", flat=True),
- },
- }
-
- def get_user_can_change(self, obj) -> bool:
- checker = ObjectPermissionChecker(self.user) if self.user is not None else None
- return (
- obj.owner is None
- or obj.owner == self.user
- or (
- self.user is not None
- and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
- )
- )
-
- @staticmethod
- def get_shared_object_pks(objects: Iterable):
- """
- Return the primary keys of the subset of objects that are shared.
- """
- try:
- first_obj = next(iter(objects))
- except StopIteration:
- return set()
-
- ctype = ContentType.objects.get_for_model(first_obj)
- object_pks = list(obj.pk for obj in objects)
- pk_type = type(first_obj.pk)
-
- def get_pks_for_permission_type(model):
- return map(
- pk_type, # coerce the pk to be the same type of the provided objects
- model.objects.filter(
- content_type=ctype,
- object_pk__in=object_pks,
- )
- .values_list("object_pk", flat=True)
- .distinct(),
- )
-
- UserObjectPermission = get_user_obj_perms_model()
- GroupObjectPermission = get_group_obj_perms_model()
- user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
- group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
-
- return set(user_permission_pks) | set(group_permission_pks)
-
- def get_is_shared_by_requester(self, obj: Document) -> bool:
- # First check the context to see if `shared_object_pks` is set by the parent.
- shared_object_pks = self.context.get("shared_object_pks")
- # If not just check if the current object is shared.
- if shared_object_pks is None:
- shared_object_pks = self.get_shared_object_pks([obj])
- return obj.owner == self.user and obj.id in shared_object_pks
-
- permissions = SerializerMethodField(read_only=True)
- user_can_change = SerializerMethodField(read_only=True)
- is_shared_by_requester = SerializerMethodField(read_only=True)
-
- set_permissions = SetPermissionsSerializer(
- label="Set permissions",
- allow_empty=True,
- required=False,
- write_only=True,
- )
- # other methods in mixin
-
- def validate_unique_together(self, validated_data, instance=None):
- # workaround for https://github.com/encode/django-rest-framework/issues/9358
- if "owner" in validated_data and "name" in self.Meta.fields:
- name = validated_data.get("name", instance.name if instance else None)
- objects = (
- self.Meta.model.objects.exclude(pk=instance.pk)
- if instance
- else self.Meta.model.objects.all()
- )
- not_unique = objects.filter(
- owner=validated_data["owner"],
- name=name,
- ).exists()
- if not_unique:
- raise serializers.ValidationError(
- {"error": "Object violates owner / name unique constraint"},
- )
-
- def create(self, validated_data):
- # default to current user if not set
- request = self.context.get("request")
- if (
- "owner" not in validated_data
- or (request is not None and "owner" not in request.data)
- ) and self.user:
- validated_data["owner"] = self.user
- permissions = None
- if "set_permissions" in validated_data:
- permissions = validated_data.pop("set_permissions")
- self.validate_unique_together(validated_data)
- instance = super().create(validated_data)
- if permissions is not None:
- self._set_permissions(permissions, instance)
- return instance
-
- def update(self, instance, validated_data):
- if "set_permissions" in validated_data:
- self._set_permissions(validated_data["set_permissions"], instance)
- self.validate_unique_together(validated_data, instance)
- return super().update(instance, validated_data)
-
-
-class OwnedObjectListSerializer(serializers.ListSerializer):
- def to_representation(self, documents):
- self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
- documents,
- )
- return super().to_representation(documents)
-
-
-class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
- last_correspondence = serializers.DateTimeField(read_only=True, required=False)
-
- class Meta:
- model = Correspondent
- fields = (
- "id",
- "slug",
- "name",
- "match",
- "matching_algorithm",
- "is_insensitive",
- "document_count",
- "last_correspondence",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- )
-
-
-class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
- class Meta:
- model = DocumentType
- fields = (
- "id",
- "slug",
- "name",
- "match",
- "matching_algorithm",
- "is_insensitive",
- "document_count",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- )
-
-
-class DeprecatedColors:
- COLOURS = (
- (1, "#a6cee3"),
- (2, "#1f78b4"),
- (3, "#b2df8a"),
- (4, "#33a02c"),
- (5, "#fb9a99"),
- (6, "#e31a1c"),
- (7, "#fdbf6f"),
- (8, "#ff7f00"),
- (9, "#cab2d6"),
- (10, "#6a3d9a"),
- (11, "#b15928"),
- (12, "#000000"),
- (13, "#cccccc"),
- )
-
-
-@extend_schema_field(
- serializers.ChoiceField(
- choices=DeprecatedColors.COLOURS,
- ),
-)
-class ColorField(serializers.Field):
- def to_internal_value(self, data):
- for id, color in DeprecatedColors.COLOURS:
- if id == data:
- return color
- raise serializers.ValidationError
-
- def to_representation(self, value):
- for id, color in DeprecatedColors.COLOURS:
- if color == value:
- return id
- return 1
-
-
-class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
- colour = ColorField(source="color", default="#a6cee3")
-
- class Meta:
- model = Tag
- fields = (
- "id",
- "slug",
- "name",
- "colour",
- "match",
- "matching_algorithm",
- "is_insensitive",
- "is_inbox_tag",
- "document_count",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- )
-
-
-class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
- def get_text_color(self, obj) -> str:
- try:
- h = obj.color.lstrip("#")
- rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
- luminance = math.sqrt(
- 0.299 * math.pow(rgb[0], 2)
- + 0.587 * math.pow(rgb[1], 2)
- + 0.114 * math.pow(rgb[2], 2),
- )
- return "#ffffff" if luminance < 0.53 else "#000000"
- except ValueError:
- return "#000000"
-
- text_color = serializers.SerializerMethodField()
-
- class Meta:
- model = Tag
- fields = (
- "id",
- "slug",
- "name",
- "color",
- "text_color",
- "match",
- "matching_algorithm",
- "is_insensitive",
- "is_inbox_tag",
- "document_count",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- )
-
- def validate_color(self, color):
- regex = r"#[0-9a-fA-F]{6}"
- if not re.match(regex, color):
- raise serializers.ValidationError(_("Invalid color."))
- return color
-
-
-class CorrespondentField(serializers.PrimaryKeyRelatedField):
- def get_queryset(self):
- return Correspondent.objects.all()
-
-
-class TagsField(serializers.PrimaryKeyRelatedField):
- def get_queryset(self):
- return Tag.objects.all()
-
-
-class DocumentTypeField(serializers.PrimaryKeyRelatedField):
- def get_queryset(self):
- return DocumentType.objects.all()
-
-
-class StoragePathField(serializers.PrimaryKeyRelatedField):
- def get_queryset(self):
- return StoragePath.objects.all()
-
-
-class CustomFieldSerializer(serializers.ModelSerializer):
- def __init__(self, *args, **kwargs):
- context = kwargs.get("context")
- self.api_version = int(
- context.get("request").version
- if context and context.get("request")
- else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
- )
- super().__init__(*args, **kwargs)
-
- data_type = serializers.ChoiceField(
- choices=CustomField.FieldDataType,
- read_only=False,
- )
-
- document_count = serializers.IntegerField(read_only=True)
-
- class Meta:
- model = CustomField
- fields = [
- "id",
- "name",
- "data_type",
- "extra_data",
- "document_count",
- ]
-
- def validate(self, attrs):
- # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
- name = attrs.get(
- "name",
- self.instance.name if hasattr(self.instance, "name") else None,
- )
- objects = (
- self.Meta.model.objects.exclude(
- pk=self.instance.pk,
- )
- if self.instance is not None
- else self.Meta.model.objects.all()
- )
- if ("name" in attrs) and objects.filter(
- name=name,
- ).exists():
- raise serializers.ValidationError(
- {"error": "Object violates name unique constraint"},
- )
- if (
- "data_type" in attrs
- and attrs["data_type"] == CustomField.FieldDataType.SELECT
- ) or (
- self.instance
- and self.instance.data_type == CustomField.FieldDataType.SELECT
- ):
- if (
- "extra_data" not in attrs
- or "select_options" not in attrs["extra_data"]
- or not isinstance(attrs["extra_data"]["select_options"], list)
- or len(attrs["extra_data"]["select_options"]) == 0
- or not all(
- len(option.get("label", "")) > 0
- for option in attrs["extra_data"]["select_options"]
- )
- ):
- raise serializers.ValidationError(
- {"error": "extra_data.select_options must be a valid list"},
- )
- # labels are valid, generate ids if not present
- for option in attrs["extra_data"]["select_options"]:
- if option.get("id") is None:
- option["id"] = get_random_string(length=16)
- elif (
- "data_type" in attrs
- and attrs["data_type"] == CustomField.FieldDataType.MONETARY
- and "extra_data" in attrs
- and "default_currency" in attrs["extra_data"]
- and attrs["extra_data"]["default_currency"] is not None
- and (
- not isinstance(attrs["extra_data"]["default_currency"], str)
- or (
- len(attrs["extra_data"]["default_currency"]) > 0
- and len(attrs["extra_data"]["default_currency"]) != 3
- )
- )
- ):
- raise serializers.ValidationError(
- {"error": "extra_data.default_currency must be a 3-character string"},
- )
- return super().validate(attrs)
-
- def to_internal_value(self, data):
- ret = super().to_internal_value(data)
-
- if (
- self.api_version < 7
- and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
- and isinstance(ret.get("extra_data", {}).get("select_options"), list)
- ):
- ret["extra_data"]["select_options"] = [
- {
- "label": option,
- "id": get_random_string(length=16),
- }
- for option in ret["extra_data"]["select_options"]
- ]
-
- return ret
-
- def to_representation(self, instance):
- ret = super().to_representation(instance)
-
- if (
- self.api_version < 7
- and instance.data_type == CustomField.FieldDataType.SELECT
- ):
- # Convert the select options with ids to a list of strings
- ret["extra_data"]["select_options"] = [
- option["label"] for option in ret["extra_data"]["select_options"]
- ]
-
- return ret
-
-
-class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
- """
- Based on https://stackoverflow.com/a/62579804
- """
-
- def __init__(self, method_name=None, *args, **kwargs):
- self.method_name = method_name
- kwargs["source"] = "*"
- super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
-
- def to_internal_value(self, data):
- return {self.field_name: data}
-
-
-class CustomFieldInstanceSerializer(serializers.ModelSerializer):
- field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
- value = ReadWriteSerializerMethodField(allow_null=True)
-
- def create(self, validated_data):
- # An instance is attached to a document
- document: Document = validated_data["document"]
- # And to a CustomField
- custom_field: CustomField = validated_data["field"]
- # This key must exist, as it is validated
- data_store_name = CustomFieldInstance.get_value_field_name(
- custom_field.data_type,
- )
-
- if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
- # prior to update so we can look for any docs that are going to be removed
- bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
-
- # Actually update or create the instance, providing the value
- # to fill in the correct attribute based on the type
- instance, _ = CustomFieldInstance.objects.update_or_create(
- document=document,
- field=custom_field,
- defaults={data_store_name: validated_data["value"]},
- )
- return instance
-
- def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
- return obj.value
-
- def validate(self, data):
- """
- Probably because we're kind of doing it odd, validation from the model
- doesn't run against the field "value", so we have to re-create it here.
-
- Don't like it, but it is better than returning an HTTP 500 when the database
- hates the value
- """
- data = super().validate(data)
- field: CustomField = data["field"]
- if "value" in data and data["value"] is not None:
- if (
- field.data_type == CustomField.FieldDataType.URL
- and len(data["value"]) > 0
- ):
- uri_validator(data["value"])
- elif field.data_type == CustomField.FieldDataType.INT:
- integer_validator(data["value"])
- elif (
- field.data_type == CustomField.FieldDataType.MONETARY
- and data["value"] != ""
- ):
- try:
- # First try to validate as a number from legacy format
- DecimalValidator(max_digits=12, decimal_places=2)(
- Decimal(str(data["value"])),
- )
- except Exception:
- # If that fails, try to validate as a monetary string
- RegexValidator(
- regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
- message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
- )(data["value"])
- elif field.data_type == CustomField.FieldDataType.STRING:
- MaxLengthValidator(limit_value=128)(data["value"])
- elif field.data_type == CustomField.FieldDataType.SELECT:
- select_options = field.extra_data["select_options"]
- try:
- next(
- option
- for option in select_options
- if option["id"] == data["value"]
- )
- except Exception:
- raise serializers.ValidationError(
- f"Value must be an id of an element in {select_options}",
- )
- elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
- if not (isinstance(data["value"], list) or data["value"] is None):
- raise serializers.ValidationError(
- "Value must be a list",
- )
- doc_ids = data["value"]
- if Document.objects.filter(id__in=doc_ids).count() != len(
- data["value"],
- ):
- raise serializers.ValidationError(
- "Some documents in value don't exist or were specified twice.",
- )
-
- return data
-
- def get_api_version(self):
- return int(
- self.context.get("request").version
- if self.context.get("request")
- else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
- )
-
- def to_internal_value(self, data):
- ret = super().to_internal_value(data)
-
- if (
- self.get_api_version() < 7
- and ret.get("field").data_type == CustomField.FieldDataType.SELECT
- and ret.get("value") is not None
- ):
- # Convert the index of the option in the field.extra_data["select_options"]
- # list to the options unique id
- ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
- "id"
- ]
-
- return ret
-
- def to_representation(self, instance):
- ret = super().to_representation(instance)
-
- if (
- self.get_api_version() < 7
- and instance.field.data_type == CustomField.FieldDataType.SELECT
- ):
- # return the index of the option in the field.extra_data["select_options"] list
- ret["value"] = next(
- (
- idx
- for idx, option in enumerate(
- instance.field.extra_data["select_options"],
- )
- if option["id"] == instance.value
- ),
- None,
- )
-
- return ret
-
- class Meta:
- model = CustomFieldInstance
- fields = [
- "value",
- "field",
- ]
-
-
-class BasicUserSerializer(serializers.ModelSerializer):
- # Different than paperless.serializers.UserSerializer
- class Meta:
- model = User
- fields = ["id", "username", "first_name", "last_name"]
-
-
-class NotesSerializer(serializers.ModelSerializer):
- user = BasicUserSerializer(read_only=True)
-
- class Meta:
- model = Note
- fields = ["id", "note", "created", "user"]
- ordering = ["-created"]
-
-
-class DocumentSerializer(
- OwnedObjectSerializer,
- NestedUpdateMixin,
- DynamicFieldsModelSerializer,
-):
- correspondent = CorrespondentField(allow_null=True)
- tags = TagsField(many=True)
- document_type = DocumentTypeField(allow_null=True)
- storage_path = StoragePathField(allow_null=True)
-
- original_file_name = SerializerMethodField()
- archived_file_name = SerializerMethodField()
- created_date = serializers.DateField(required=False)
- page_count = SerializerMethodField()
-
- notes = NotesSerializer(many=True, required=False, read_only=True)
-
- custom_fields = CustomFieldInstanceSerializer(
- many=True,
- allow_null=False,
- required=False,
- )
-
- owner = serializers.PrimaryKeyRelatedField(
- queryset=User.objects.all(),
- required=False,
- allow_null=True,
- )
-
- remove_inbox_tags = serializers.BooleanField(
- default=False,
- write_only=True,
- allow_null=True,
- required=False,
- )
-
- def get_page_count(self, obj) -> int | None:
- return obj.page_count
-
- def get_original_file_name(self, obj) -> str | None:
- return obj.original_filename
-
- def get_archived_file_name(self, obj) -> str | None:
- if obj.has_archive_version:
- return obj.get_public_filename(archive=True)
- else:
- return None
-
- def to_representation(self, instance):
- doc = super().to_representation(instance)
- if self.truncate_content and "content" in self.fields:
- doc["content"] = doc.get("content")[0:550]
- return doc
-
- def validate(self, attrs):
- if (
- "archive_serial_number" in attrs
- and attrs["archive_serial_number"] is not None
- and len(str(attrs["archive_serial_number"])) > 0
- and Document.deleted_objects.filter(
- archive_serial_number=attrs["archive_serial_number"],
- ).exists()
- ):
- raise serializers.ValidationError(
- {
- "archive_serial_number": [
- "Document with this Archive Serial Number already exists in the trash.",
- ],
- },
- )
- return super().validate(attrs)
-
- def update(self, instance: Document, validated_data):
- if "created_date" in validated_data and "created" not in validated_data:
- new_datetime = datetime.datetime.combine(
- validated_data.get("created_date"),
- datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
- )
- instance.created = new_datetime
- instance.save()
- if "created_date" in validated_data:
- validated_data.pop("created_date")
- if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
- incoming_custom_fields = [
- field["field"] for field in validated_data["custom_fields"]
- ]
- for custom_field_instance in instance.custom_fields.filter(
- field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
- ):
- if (
- custom_field_instance.field not in incoming_custom_fields
- and custom_field_instance.value is not None
- ):
- # Doc link field is being removed entirely
- for doc_id in custom_field_instance.value:
- bulk_edit.remove_doclink(
- instance,
- custom_field_instance.field,
- doc_id,
- )
- if validated_data.get("remove_inbox_tags"):
- tag_ids_being_added = (
- [
- tag.id
- for tag in validated_data["tags"]
- if tag not in instance.tags.all()
- ]
- if "tags" in validated_data
- else []
- )
- inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
- id__in=tag_ids_being_added,
- )
- if "tags" in validated_data:
- validated_data["tags"] = [
- tag
- for tag in validated_data["tags"]
- if tag not in inbox_tags_not_being_added
- ]
- else:
- validated_data["tags"] = [
- tag
- for tag in instance.tags.all()
- if tag not in inbox_tags_not_being_added
- ]
- if settings.AUDIT_LOG_ENABLED:
- with set_actor(self.user):
- super().update(instance, validated_data)
- else:
- super().update(instance, validated_data)
- # hard delete custom field instances that were soft deleted
- CustomFieldInstance.deleted_objects.filter(document=instance).delete()
- return instance
-
- def __init__(self, *args, **kwargs):
- self.truncate_content = kwargs.pop("truncate_content", False)
-
- # return full permissions if we're doing a PATCH or PUT
- context = kwargs.get("context")
- if context is not None and (
- context.get("request").method == "PATCH"
- or context.get("request").method == "PUT"
- ):
- kwargs["full_perms"] = True
-
- super().__init__(*args, **kwargs)
-
- class Meta:
- model = Document
- fields = (
- "id",
- "correspondent",
- "document_type",
- "storage_path",
- "title",
- "content",
- "tags",
- "created",
- "created_date",
- "modified",
- "added",
- "deleted_at",
- "archive_serial_number",
- "original_file_name",
- "archived_file_name",
- "owner",
- "permissions",
- "user_can_change",
- "is_shared_by_requester",
- "set_permissions",
- "notes",
- "custom_fields",
- "remove_inbox_tags",
- "page_count",
- "mime_type",
- )
- list_serializer_class = OwnedObjectListSerializer
-
-
-class SearchResultListSerializer(serializers.ListSerializer):
- def to_representation(self, hits):
- document_ids = [hit["id"] for hit in hits]
- # Fetch all Document objects in the list in one SQL query.
- documents = self.child.fetch_documents(document_ids)
- self.child.context["documents"] = documents
- # Also check if they are shared with other users / groups.
- self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
- documents.values(),
- )
-
- return super().to_representation(hits)
-
-
-class SearchResultSerializer(DocumentSerializer):
- @staticmethod
- def fetch_documents(ids):
- """
- Return a dict that maps given document IDs to Document objects.
- """
- return {
- document.id: document
- for document in Document.objects.select_related(
- "correspondent",
- "storage_path",
- "document_type",
- "owner",
- )
- .prefetch_related("tags", "custom_fields", "notes")
- .filter(id__in=ids)
- }
-
- def to_representation(self, hit):
- # Again we first check if the parent has already fetched the documents.
- documents = self.context.get("documents")
- # Otherwise we fetch this document.
- if documents is None: # pragma: no cover
- # In practice we only serialize **lists** of whoosh.searching.Hit.
- # I'm keeping this check for completeness but marking it no cover for now.
- documents = self.fetch_documents([hit["id"]])
- document = documents[hit["id"]]
-
- notes = ",".join(
- [str(c.note) for c in document.notes.all()],
- )
- r = super().to_representation(document)
- r["__search_hit__"] = {
- "score": hit.score,
- "highlights": hit.highlights("content", text=document.content),
- "note_highlights": (
- hit.highlights("notes", text=notes) if document else None
- ),
- "rank": hit.rank,
- }
-
- return r
-
- class Meta(DocumentSerializer.Meta):
- list_serializer_class = SearchResultListSerializer
-
-
-class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
- class Meta:
- model = SavedViewFilterRule
- fields = ["rule_type", "value"]
-
-
-class SavedViewSerializer(OwnedObjectSerializer):
- filter_rules = SavedViewFilterRuleSerializer(many=True)
-
- class Meta:
- model = SavedView
- fields = [
- "id",
- "name",
- "show_on_dashboard",
- "show_in_sidebar",
- "sort_field",
- "sort_reverse",
- "filter_rules",
- "page_size",
- "display_mode",
- "display_fields",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- ]
-
- def validate(self, attrs):
- attrs = super().validate(attrs)
- if "display_fields" in attrs and attrs["display_fields"] is not None:
- for field in attrs["display_fields"]:
- if (
- SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
- ): # i.e. check for 'custom_field_' prefix
- field_id = int(re.search(r"\d+", field)[0])
- if not CustomField.objects.filter(id=field_id).exists():
- raise serializers.ValidationError(
- f"Invalid field: {field}",
- )
- elif field not in SavedView.DisplayFields.values:
- raise serializers.ValidationError(
- f"Invalid field: {field}",
- )
- return attrs
-
- def update(self, instance, validated_data):
- if "filter_rules" in validated_data:
- rules_data = validated_data.pop("filter_rules")
- else:
- rules_data = None
- if "user" in validated_data:
- # backwards compatibility
- validated_data["owner"] = validated_data.pop("user")
- if (
- "display_fields" in validated_data
- and isinstance(
- validated_data["display_fields"],
- list,
- )
- and len(validated_data["display_fields"]) == 0
- ):
- validated_data["display_fields"] = None
- super().update(instance, validated_data)
- if rules_data is not None:
- SavedViewFilterRule.objects.filter(saved_view=instance).delete()
- for rule_data in rules_data:
- SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
- return instance
-
- def create(self, validated_data):
- rules_data = validated_data.pop("filter_rules")
- if "user" in validated_data:
- # backwards compatibility
- validated_data["owner"] = validated_data.pop("user")
- saved_view = SavedView.objects.create(**validated_data)
- for rule_data in rules_data:
- SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
- return saved_view
-
-
-class DocumentListSerializer(serializers.Serializer):
- documents = serializers.ListField(
- required=True,
- label="Documents",
- write_only=True,
- child=serializers.IntegerField(),
- )
-
- def _validate_document_id_list(self, documents, name="documents"):
- if not isinstance(documents, list):
- raise serializers.ValidationError(f"{name} must be a list")
- if not all(isinstance(i, int) for i in documents):
- raise serializers.ValidationError(f"{name} must be a list of integers")
- count = Document.objects.filter(id__in=documents).count()
- if not count == len(documents):
- raise serializers.ValidationError(
- f"Some documents in {name} don't exist or were specified twice.",
- )
-
- def validate_documents(self, documents):
- self._validate_document_id_list(documents)
- return documents
-
-
-class BulkEditSerializer(
- SerializerWithPerms,
- DocumentListSerializer,
- SetPermissionsMixin,
-):
- method = serializers.ChoiceField(
- choices=[
- "set_correspondent",
- "set_document_type",
- "set_storage_path",
- "add_tag",
- "remove_tag",
- "modify_tags",
- "modify_custom_fields",
- "delete",
- "reprocess",
- "set_permissions",
- "rotate",
- "merge",
- "split",
- "delete_pages",
- ],
- label="Method",
- write_only=True,
- )
-
- parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
-
- def _validate_tag_id_list(self, tags, name="tags"):
- if not isinstance(tags, list):
- raise serializers.ValidationError(f"{name} must be a list")
- if not all(isinstance(i, int) for i in tags):
- raise serializers.ValidationError(f"{name} must be a list of integers")
- count = Tag.objects.filter(id__in=tags).count()
- if not count == len(tags):
- raise serializers.ValidationError(
- f"Some tags in {name} don't exist or were specified twice.",
- )
-
- def _validate_custom_field_id_list_or_dict(
- self,
- custom_fields,
- name="custom_fields",
- ):
- ids = custom_fields
- if isinstance(custom_fields, dict):
- try:
- ids = [int(i[0]) for i in custom_fields.items()]
- except Exception as e:
- logger.exception(f"Error validating custom fields: {e}")
- raise serializers.ValidationError(
- f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
- )
- elif not isinstance(custom_fields, list) or not all(
- isinstance(i, int) for i in ids
- ):
- raise serializers.ValidationError(
- f"{name} must be a list of integers or a dict of id:value pairs",
- )
- count = CustomField.objects.filter(id__in=ids).count()
- if not count == len(ids):
- raise serializers.ValidationError(
- f"Some custom fields in {name} don't exist or were specified twice.",
- )
-
- def validate_method(self, method):
- if method == "set_correspondent":
- return bulk_edit.set_correspondent
- elif method == "set_document_type":
- return bulk_edit.set_document_type
- elif method == "set_storage_path":
- return bulk_edit.set_storage_path
- elif method == "add_tag":
- return bulk_edit.add_tag
- elif method == "remove_tag":
- return bulk_edit.remove_tag
- elif method == "modify_tags":
- return bulk_edit.modify_tags
- elif method == "modify_custom_fields":
- return bulk_edit.modify_custom_fields
- elif method == "delete":
- return bulk_edit.delete
- elif method == "redo_ocr" or method == "reprocess":
- return bulk_edit.reprocess
- elif method == "set_permissions":
- return bulk_edit.set_permissions
- elif method == "rotate":
- return bulk_edit.rotate
- elif method == "merge":
- return bulk_edit.merge
- elif method == "split":
- return bulk_edit.split
- elif method == "delete_pages":
- return bulk_edit.delete_pages
- else:
- raise serializers.ValidationError("Unsupported method.")
-
- def _validate_parameters_tags(self, parameters):
- if "tag" in parameters:
- tag_id = parameters["tag"]
- try:
- Tag.objects.get(id=tag_id)
- except Tag.DoesNotExist:
- raise serializers.ValidationError("Tag does not exist")
- else:
- raise serializers.ValidationError("tag not specified")
-
- def _validate_parameters_document_type(self, parameters):
- if "document_type" in parameters:
- document_type_id = parameters["document_type"]
- if document_type_id is None:
- # None is ok
- return
- try:
- DocumentType.objects.get(id=document_type_id)
- except DocumentType.DoesNotExist:
- raise serializers.ValidationError("Document type does not exist")
- else:
- raise serializers.ValidationError("document_type not specified")
-
- def _validate_parameters_correspondent(self, parameters):
- if "correspondent" in parameters:
- correspondent_id = parameters["correspondent"]
- if correspondent_id is None:
- return
- try:
- Correspondent.objects.get(id=correspondent_id)
- except Correspondent.DoesNotExist:
- raise serializers.ValidationError("Correspondent does not exist")
- else:
- raise serializers.ValidationError("correspondent not specified")
-
- def _validate_storage_path(self, parameters):
- if "storage_path" in parameters:
- storage_path_id = parameters["storage_path"]
- if storage_path_id is None:
- return
- try:
- StoragePath.objects.get(id=storage_path_id)
- except StoragePath.DoesNotExist:
- raise serializers.ValidationError(
- "Storage path does not exist",
- )
- else:
- raise serializers.ValidationError("storage path not specified")
-
- def _validate_parameters_modify_tags(self, parameters):
- if "add_tags" in parameters:
- self._validate_tag_id_list(parameters["add_tags"], "add_tags")
- else:
- raise serializers.ValidationError("add_tags not specified")
-
- if "remove_tags" in parameters:
- self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
- else:
- raise serializers.ValidationError("remove_tags not specified")
-
- def _validate_parameters_modify_custom_fields(self, parameters):
- if "add_custom_fields" in parameters:
- self._validate_custom_field_id_list_or_dict(
- parameters["add_custom_fields"],
- "add_custom_fields",
- )
- else:
- raise serializers.ValidationError("add_custom_fields not specified")
-
- if "remove_custom_fields" in parameters:
- self._validate_custom_field_id_list_or_dict(
- parameters["remove_custom_fields"],
- "remove_custom_fields",
- )
- else:
- raise serializers.ValidationError("remove_custom_fields not specified")
-
- def _validate_owner(self, owner):
- ownerUser = User.objects.get(pk=owner)
- if ownerUser is None:
- raise serializers.ValidationError("Specified owner cannot be found")
- return ownerUser
-
- def _validate_parameters_set_permissions(self, parameters):
- parameters["set_permissions"] = self.validate_set_permissions(
- parameters["set_permissions"],
- )
- if "owner" in parameters and parameters["owner"] is not None:
- self._validate_owner(parameters["owner"])
- if "merge" not in parameters:
- parameters["merge"] = False
-
- def _validate_parameters_rotate(self, parameters):
- try:
- if (
- "degrees" not in parameters
- or not float(parameters["degrees"]).is_integer()
- ):
- raise serializers.ValidationError("invalid rotation degrees")
- except ValueError:
- raise serializers.ValidationError("invalid rotation degrees")
-
- def _validate_parameters_split(self, parameters):
- if "pages" not in parameters:
- raise serializers.ValidationError("pages not specified")
- try:
- pages = []
- docs = parameters["pages"].split(",")
- for doc in docs:
- if "-" in doc:
- pages.append(
- [
- x
- for x in range(
- int(doc.split("-")[0]),
- int(doc.split("-")[1]) + 1,
- )
- ],
- )
- else:
- pages.append([int(doc)])
- parameters["pages"] = pages
- except ValueError:
- raise serializers.ValidationError("invalid pages specified")
-
- if "delete_originals" in parameters:
- if not isinstance(parameters["delete_originals"], bool):
- raise serializers.ValidationError("delete_originals must be a boolean")
- else:
- parameters["delete_originals"] = False
-
- def _validate_parameters_delete_pages(self, parameters):
- if "pages" not in parameters:
- raise serializers.ValidationError("pages not specified")
- if not isinstance(parameters["pages"], list):
- raise serializers.ValidationError("pages must be a list")
- if not all(isinstance(i, int) for i in parameters["pages"]):
- raise serializers.ValidationError("pages must be a list of integers")
-
- def _validate_parameters_merge(self, parameters):
- if "delete_originals" in parameters:
- if not isinstance(parameters["delete_originals"], bool):
- raise serializers.ValidationError("delete_originals must be a boolean")
- else:
- parameters["delete_originals"] = False
- if "archive_fallback" in parameters:
- if not isinstance(parameters["archive_fallback"], bool):
- raise serializers.ValidationError("archive_fallback must be a boolean")
- else:
- parameters["archive_fallback"] = False
-
- def validate(self, attrs):
- method = attrs["method"]
- parameters = attrs["parameters"]
-
- if method == bulk_edit.set_correspondent:
- self._validate_parameters_correspondent(parameters)
- elif method == bulk_edit.set_document_type:
- self._validate_parameters_document_type(parameters)
- elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
- self._validate_parameters_tags(parameters)
- elif method == bulk_edit.modify_tags:
- self._validate_parameters_modify_tags(parameters)
- elif method == bulk_edit.set_storage_path:
- self._validate_storage_path(parameters)
- elif method == bulk_edit.modify_custom_fields:
- self._validate_parameters_modify_custom_fields(parameters)
- elif method == bulk_edit.set_permissions:
- self._validate_parameters_set_permissions(parameters)
- elif method == bulk_edit.rotate:
- self._validate_parameters_rotate(parameters)
- elif method == bulk_edit.split:
- if len(attrs["documents"]) > 1:
- raise serializers.ValidationError(
- "Split method only supports one document",
- )
- self._validate_parameters_split(parameters)
- elif method == bulk_edit.delete_pages:
- if len(attrs["documents"]) > 1:
- raise serializers.ValidationError(
- "Delete pages method only supports one document",
- )
- self._validate_parameters_delete_pages(parameters)
- elif method == bulk_edit.merge:
- self._validate_parameters_merge(parameters)
-
- return attrs
-
-
-class PostDocumentSerializer(serializers.Serializer):
- created = serializers.DateTimeField(
- label="Created",
- allow_null=True,
- write_only=True,
- required=False,
- )
-
- document = serializers.FileField(
- label="Document",
- write_only=True,
- )
-
- title = serializers.CharField(
- label="Title",
- write_only=True,
- required=False,
- )
-
- correspondent = serializers.PrimaryKeyRelatedField(
- queryset=Correspondent.objects.all(),
- label="Correspondent",
- allow_null=True,
- write_only=True,
- required=False,
- )
-
- document_type = serializers.PrimaryKeyRelatedField(
- queryset=DocumentType.objects.all(),
- label="Document type",
- allow_null=True,
- write_only=True,
- required=False,
- )
-
- storage_path = serializers.PrimaryKeyRelatedField(
- queryset=StoragePath.objects.all(),
- label="Storage path",
- allow_null=True,
- write_only=True,
- required=False,
- )
-
- tags = serializers.PrimaryKeyRelatedField(
- many=True,
- queryset=Tag.objects.all(),
- label="Tags",
- write_only=True,
- required=False,
- )
-
- archive_serial_number = serializers.IntegerField(
- label="ASN",
- write_only=True,
- required=False,
- min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
- max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
- )
-
- custom_fields = serializers.PrimaryKeyRelatedField(
- many=True,
- queryset=CustomField.objects.all(),
- label="Custom fields",
- write_only=True,
- required=False,
- )
-
- from_webui = serializers.BooleanField(
- label="Documents are from Paperless-ngx WebUI",
- write_only=True,
- required=False,
- )
-
- def validate_document(self, document):
- document_data = document.file.read()
- mime_type = magic.from_buffer(document_data, mime=True)
-
- if not is_mime_type_supported(mime_type):
- if (
- mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
- and document.name.endswith(
- ".pdf",
- )
- ):
- # If the file is an invalid PDF, we can try to recover it later in the consumer
- mime_type = "application/pdf"
- else:
- raise serializers.ValidationError(
- _("File type %(type)s not supported") % {"type": mime_type},
- )
-
- return document.name, document_data
-
- def validate_correspondent(self, correspondent):
- if correspondent:
- return correspondent.id
- else:
- return None
-
- def validate_document_type(self, document_type):
- if document_type:
- return document_type.id
- else:
- return None
-
- def validate_storage_path(self, storage_path):
- if storage_path:
- return storage_path.id
- else:
- return None
-
- def validate_tags(self, tags):
- if tags:
- return [tag.id for tag in tags]
- else:
- return None
-
- def validate_custom_fields(self, custom_fields):
- if custom_fields:
- return [custom_field.id for custom_field in custom_fields]
- else:
- return None
-
-
-class BulkDownloadSerializer(DocumentListSerializer):
- content = serializers.ChoiceField(
- choices=["archive", "originals", "both"],
- default="archive",
- )
-
- compression = serializers.ChoiceField(
- choices=["none", "deflated", "bzip2", "lzma"],
- default="none",
- )
-
- follow_formatting = serializers.BooleanField(
- default=False,
- )
-
- def validate_compression(self, compression):
- import zipfile
-
- return {
- "none": zipfile.ZIP_STORED,
- "deflated": zipfile.ZIP_DEFLATED,
- "bzip2": zipfile.ZIP_BZIP2,
- "lzma": zipfile.ZIP_LZMA,
- }[compression]
-
-
-class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
- class Meta:
- model = StoragePath
- fields = (
- "id",
- "slug",
- "name",
- "path",
- "match",
- "matching_algorithm",
- "is_insensitive",
- "document_count",
- "owner",
- "permissions",
- "user_can_change",
- "set_permissions",
- )
-
- def validate_path(self, path: str):
- converted_path = convert_format_str_to_template_format(path)
- if converted_path != path:
- logger.warning(
- f"Storage path {path} is not using the new style format, consider updating",
- )
- result = validate_filepath_template_and_render(converted_path)
-
- if result is None:
- raise serializers.ValidationError(_("Invalid variable detected."))
-
- return converted_path
-
- def update(self, instance, validated_data):
- """
- When a storage path is updated, see if documents
- using it require a rename/move
- """
- doc_ids = [doc.id for doc in instance.documents.all()]
- if len(doc_ids):
- bulk_edit.bulk_update_documents.delay(doc_ids)
-
- return super().update(instance, validated_data)
-
-
-class UiSettingsViewSerializer(serializers.ModelSerializer):
- class Meta:
- model = UiSettings
- depth = 1
- fields = [
- "id",
- "settings",
- ]
-
- def validate_settings(self, settings):
- # we never save update checking backend setting
- if "update_checking" in settings:
- try:
- settings["update_checking"].pop("backend_setting")
- except KeyError:
- pass
- return settings
-
- def create(self, validated_data):
- ui_settings = UiSettings.objects.update_or_create(
- user=validated_data.get("user"),
- defaults={"settings": validated_data.get("settings", None)},
- )
- return ui_settings
-
-
-class TasksViewSerializer(OwnedObjectSerializer):
- class Meta:
- model = PaperlessTask
- fields = (
- "id",
- "task_id",
- "task_name",
- "task_file_name",
- "date_created",
- "date_done",
- "type",
- "status",
- "result",
- "acknowledged",
- "related_document",
- "owner",
- )
-
- related_document = serializers.SerializerMethodField()
- created_doc_re = re.compile(r"New document id (\d+) created")
- duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
-
- def get_related_document(self, obj) -> str | None:
- result = None
- re = None
- if obj.result:
- match obj.status:
- case states.SUCCESS:
- re = self.created_doc_re
- case states.FAILURE:
- re = (
- self.duplicate_doc_re
- if "existing document is in the trash" not in obj.result
- else None
- )
- if re is not None:
- try:
- result = re.search(obj.result).group(1)
- except Exception:
- pass
-
- return result
-
-
-class RunTaskViewSerializer(serializers.Serializer):
- task_name = serializers.ChoiceField(
- choices=PaperlessTask.TaskName.choices,
- label="Task Name",
- write_only=True,
- )
-
-
-class AcknowledgeTasksViewSerializer(serializers.Serializer):
- tasks = serializers.ListField(
- required=True,
- label="Tasks",
- write_only=True,
- child=serializers.IntegerField(),
- )
-
- def _validate_task_id_list(self, tasks, name="tasks"):
- if not isinstance(tasks, list):
- raise serializers.ValidationError(f"{name} must be a list")
- if not all(isinstance(i, int) for i in tasks):
- raise serializers.ValidationError(f"{name} must be a list of integers")
- count = PaperlessTask.objects.filter(id__in=tasks).count()
- if not count == len(tasks):
- raise serializers.ValidationError(
- f"Some tasks in {name} don't exist or were specified twice.",
- )
-
- def validate_tasks(self, tasks):
- self._validate_task_id_list(tasks)
- return tasks
-
-
-class ShareLinkSerializer(OwnedObjectSerializer):
- class Meta:
- model = ShareLink
- fields = (
- "id",
- "created",
- "expiration",
- "slug",
- "document",
- "file_version",
- )
-
- def create(self, validated_data):
- validated_data["slug"] = get_random_string(50)
- return super().create(validated_data)
-
-
-class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
- objects = serializers.ListField(
- required=True,
- allow_empty=False,
- label="Objects",
- write_only=True,
- child=serializers.IntegerField(),
- )
-
- object_type = serializers.ChoiceField(
- choices=[
- "tags",
- "correspondents",
- "document_types",
- "storage_paths",
- ],
- label="Object Type",
- write_only=True,
- )
-
- operation = serializers.ChoiceField(
- choices=[
- "set_permissions",
- "delete",
- ],
- label="Operation",
- required=True,
- write_only=True,
- )
-
- owner = serializers.PrimaryKeyRelatedField(
- queryset=User.objects.all(),
- required=False,
- allow_null=True,
- )
-
- permissions = serializers.DictField(
- label="Set permissions",
- allow_empty=False,
- required=False,
- write_only=True,
- )
-
- merge = serializers.BooleanField(
- default=False,
- write_only=True,
- required=False,
- )
-
- def get_object_class(self, object_type):
- object_class = None
- if object_type == "tags":
- object_class = Tag
- elif object_type == "correspondents":
- object_class = Correspondent
- elif object_type == "document_types":
- object_class = DocumentType
- elif object_type == "storage_paths":
- object_class = StoragePath
- return object_class
-
- def _validate_objects(self, objects, object_type):
- if not isinstance(objects, list):
- raise serializers.ValidationError("objects must be a list")
- if not all(isinstance(i, int) for i in objects):
- raise serializers.ValidationError("objects must be a list of integers")
- object_class = self.get_object_class(object_type)
- count = object_class.objects.filter(id__in=objects).count()
- if not count == len(objects):
- raise serializers.ValidationError(
- "Some ids in objects don't exist or were specified twice.",
- )
- return objects
-
- def _validate_permissions(self, permissions):
- self.validate_set_permissions(
- permissions,
- )
-
- def validate(self, attrs):
- object_type = attrs["object_type"]
- objects = attrs["objects"]
- operation = attrs.get("operation")
-
- self._validate_objects(objects, object_type)
-
- if operation == "set_permissions":
- permissions = attrs.get("permissions")
- if permissions is not None:
- self._validate_permissions(permissions)
-
- return attrs
-
-
-class WorkflowTriggerSerializer(serializers.ModelSerializer):
- id = serializers.IntegerField(required=False, allow_null=True)
- sources = fields.MultipleChoiceField(
- choices=WorkflowTrigger.DocumentSourceChoices.choices,
- allow_empty=True,
- default={
- DocumentSource.ConsumeFolder,
- DocumentSource.ApiUpload,
- DocumentSource.MailFetch,
- },
- )
-
- type = serializers.ChoiceField(
- choices=WorkflowTrigger.WorkflowTriggerType.choices,
- label="Trigger Type",
- )
-
- class Meta:
- model = WorkflowTrigger
- fields = [
- "id",
- "sources",
- "type",
- "filter_path",
- "filter_filename",
- "filter_mailrule",
- "matching_algorithm",
- "match",
- "is_insensitive",
- "filter_has_tags",
- "filter_has_correspondent",
- "filter_has_document_type",
- "schedule_offset_days",
- "schedule_is_recurring",
- "schedule_recurring_interval_days",
- "schedule_date_field",
- "schedule_date_custom_field",
- ]
-
- def validate(self, attrs):
- # Empty strings treated as None to avoid unexpected behavior
- if (
- "filter_filename" in attrs
- and attrs["filter_filename"] is not None
- and len(attrs["filter_filename"]) == 0
- ):
- attrs["filter_filename"] = None
- if (
- "filter_path" in attrs
- and attrs["filter_path"] is not None
- and len(attrs["filter_path"]) == 0
- ):
- attrs["filter_path"] = None
-
- if (
- attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
- and "filter_mailrule" not in attrs
- and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
- and ("filter_path" not in attrs or attrs["filter_path"] is None)
- ):
- raise serializers.ValidationError(
- "File name, path or mail rule filter are required",
- )
-
- return attrs
-
-
-class WorkflowActionEmailSerializer(serializers.ModelSerializer):
- id = serializers.IntegerField(allow_null=True, required=False)
-
- class Meta:
- model = WorkflowActionEmail
- fields = [
- "id",
- "subject",
- "body",
- "to",
- "include_document",
- ]
-
-
-class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
- id = serializers.IntegerField(allow_null=True, required=False)
-
- def validate_url(self, url):
- url_validator(url)
- return url
-
- class Meta:
- model = WorkflowActionWebhook
- fields = [
- "id",
- "url",
- "use_params",
- "as_json",
- "params",
- "body",
- "headers",
- "include_document",
- ]
-
-
-class WorkflowActionSerializer(serializers.ModelSerializer):
- id = serializers.IntegerField(required=False, allow_null=True)
- assign_correspondent = CorrespondentField(allow_null=True, required=False)
- assign_tags = TagsField(many=True, allow_null=True, required=False)
- assign_document_type = DocumentTypeField(allow_null=True, required=False)
- assign_storage_path = StoragePathField(allow_null=True, required=False)
- email = WorkflowActionEmailSerializer(allow_null=True, required=False)
- webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
-
- class Meta:
- model = WorkflowAction
- fields = [
- "id",
- "type",
- "assign_title",
- "assign_tags",
- "assign_correspondent",
- "assign_document_type",
- "assign_storage_path",
- "assign_owner",
- "assign_view_users",
- "assign_view_groups",
- "assign_change_users",
- "assign_change_groups",
- "assign_custom_fields",
- "assign_custom_fields_values",
- "remove_all_tags",
- "remove_tags",
- "remove_all_correspondents",
- "remove_correspondents",
- "remove_all_document_types",
- "remove_document_types",
- "remove_all_storage_paths",
- "remove_storage_paths",
- "remove_custom_fields",
- "remove_all_custom_fields",
- "remove_all_owners",
- "remove_owners",
- "remove_all_permissions",
- "remove_view_users",
- "remove_view_groups",
- "remove_change_users",
- "remove_change_groups",
- "email",
- "webhook",
- ]
-
- def validate(self, attrs):
- if "assign_title" in attrs and attrs["assign_title"] is not None:
- if len(attrs["assign_title"]) == 0:
- # Empty strings treated as None to avoid unexpected behavior
- attrs["assign_title"] = None
- else:
- try:
- # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
- attrs["assign_title"].format(
- correspondent="",
- document_type="",
- added="",
- added_year="",
- added_year_short="",
- added_month="",
- added_month_name="",
- added_month_name_short="",
- added_day="",
- added_time="",
- owner_username="",
- original_filename="",
- filename="",
- created="",
- created_year="",
- created_year_short="",
- created_month="",
- created_month_name="",
- created_month_name_short="",
- created_day="",
- created_time="",
- )
- except (ValueError, KeyError) as e:
- raise serializers.ValidationError(
- {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
- )
-
- if (
- "type" in attrs
- and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
- and "email" not in attrs
- ):
- raise serializers.ValidationError(
- "Email data is required for email actions",
- )
-
- if (
- "type" in attrs
- and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
- and "webhook" not in attrs
- ):
- raise serializers.ValidationError(
- "Webhook data is required for webhook actions",
- )
-
- return attrs
-
-
-class WorkflowSerializer(serializers.ModelSerializer):
- order = serializers.IntegerField(required=False)
-
- triggers = WorkflowTriggerSerializer(many=True)
- actions = WorkflowActionSerializer(many=True)
-
- class Meta:
- model = Workflow
- fields = [
- "id",
- "name",
- "order",
- "enabled",
- "triggers",
- "actions",
- ]
-
- def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
- set_triggers = []
- set_actions = []
-
- if triggers is not None:
- for trigger in triggers:
- filter_has_tags = trigger.pop("filter_has_tags", None)
- trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
- id=trigger.get("id"),
- defaults=trigger,
- )
- if filter_has_tags is not None:
- trigger_instance.filter_has_tags.set(filter_has_tags)
- set_triggers.append(trigger_instance)
-
- if actions is not None:
- for action in actions:
- assign_tags = action.pop("assign_tags", None)
- assign_view_users = action.pop("assign_view_users", None)
- assign_view_groups = action.pop("assign_view_groups", None)
- assign_change_users = action.pop("assign_change_users", None)
- assign_change_groups = action.pop("assign_change_groups", None)
- assign_custom_fields = action.pop("assign_custom_fields", None)
- remove_tags = action.pop("remove_tags", None)
- remove_correspondents = action.pop("remove_correspondents", None)
- remove_document_types = action.pop("remove_document_types", None)
- remove_storage_paths = action.pop("remove_storage_paths", None)
- remove_custom_fields = action.pop("remove_custom_fields", None)
- remove_owners = action.pop("remove_owners", None)
- remove_view_users = action.pop("remove_view_users", None)
- remove_view_groups = action.pop("remove_view_groups", None)
- remove_change_users = action.pop("remove_change_users", None)
- remove_change_groups = action.pop("remove_change_groups", None)
-
- email_data = action.pop("email", None)
- webhook_data = action.pop("webhook", None)
-
- action_instance, _ = WorkflowAction.objects.update_or_create(
- id=action.get("id"),
- defaults=action,
- )
-
- if email_data is not None:
- serializer = WorkflowActionEmailSerializer(data=email_data)
- serializer.is_valid(raise_exception=True)
- email, _ = WorkflowActionEmail.objects.update_or_create(
- id=email_data.get("id"),
- defaults=serializer.validated_data,
- )
- action_instance.email = email
- action_instance.save()
-
- if webhook_data is not None:
- serializer = WorkflowActionWebhookSerializer(data=webhook_data)
- serializer.is_valid(raise_exception=True)
- webhook, _ = WorkflowActionWebhook.objects.update_or_create(
- id=webhook_data.get("id"),
- defaults=serializer.validated_data,
- )
- action_instance.webhook = webhook
- action_instance.save()
-
- if assign_tags is not None:
- action_instance.assign_tags.set(assign_tags)
- if assign_view_users is not None:
- action_instance.assign_view_users.set(assign_view_users)
- if assign_view_groups is not None:
- action_instance.assign_view_groups.set(assign_view_groups)
- if assign_change_users is not None:
- action_instance.assign_change_users.set(assign_change_users)
- if assign_change_groups is not None:
- action_instance.assign_change_groups.set(assign_change_groups)
- if assign_custom_fields is not None:
- action_instance.assign_custom_fields.set(assign_custom_fields)
- if remove_tags is not None:
- action_instance.remove_tags.set(remove_tags)
- if remove_correspondents is not None:
- action_instance.remove_correspondents.set(remove_correspondents)
- if remove_document_types is not None:
- action_instance.remove_document_types.set(remove_document_types)
- if remove_storage_paths is not None:
- action_instance.remove_storage_paths.set(remove_storage_paths)
- if remove_custom_fields is not None:
- action_instance.remove_custom_fields.set(remove_custom_fields)
- if remove_owners is not None:
- action_instance.remove_owners.set(remove_owners)
- if remove_view_users is not None:
- action_instance.remove_view_users.set(remove_view_users)
- if remove_view_groups is not None:
- action_instance.remove_view_groups.set(remove_view_groups)
- if remove_change_users is not None:
- action_instance.remove_change_users.set(remove_change_users)
- if remove_change_groups is not None:
- action_instance.remove_change_groups.set(remove_change_groups)
-
- set_actions.append(action_instance)
-
- instance.triggers.set(set_triggers)
- instance.actions.set(set_actions)
- instance.save()
-
- def prune_triggers_and_actions(self):
- """
- ManyToMany fields dont support e.g. on_delete so we need to discard unattached
- triggers and actionas manually
- """
- for trigger in WorkflowTrigger.objects.all():
- if trigger.workflows.all().count() == 0:
- trigger.delete()
-
- for action in WorkflowAction.objects.all():
- if action.workflows.all().count() == 0:
- action.delete()
-
- WorkflowActionEmail.objects.filter(action=None).delete()
- WorkflowActionWebhook.objects.filter(action=None).delete()
-
- def create(self, validated_data) -> Workflow:
- if "triggers" in validated_data:
- triggers = validated_data.pop("triggers")
-
- if "actions" in validated_data:
- actions = validated_data.pop("actions")
-
- instance = super().create(validated_data)
-
- self.update_triggers_and_actions(instance, triggers, actions)
-
- return instance
-
- def update(self, instance: Workflow, validated_data) -> Workflow:
- if "triggers" in validated_data:
- triggers = validated_data.pop("triggers")
-
- if "actions" in validated_data:
- actions = validated_data.pop("actions")
-
- instance = super().update(instance, validated_data)
-
- self.update_triggers_and_actions(instance, triggers, actions)
-
- self.prune_triggers_and_actions()
-
- return instance
-
-
-class TrashSerializer(SerializerWithPerms):
- documents = serializers.ListField(
- required=False,
- label="Documents",
- write_only=True,
- child=serializers.IntegerField(),
- )
-
- action = serializers.ChoiceField(
- choices=["restore", "empty"],
- label="Action",
- write_only=True,
- )
-
- def validate_documents(self, documents):
- count = Document.deleted_objects.filter(id__in=documents).count()
- if not count == len(documents):
- raise serializers.ValidationError(
- "Some documents in the list have not yet been deleted.",
- )
- return documents
-
-
-class StoragePathTestSerializer(SerializerWithPerms):
- path = serializers.CharField(
- required=True,
- label="Path",
- write_only=True,
- )
-
- document = serializers.PrimaryKeyRelatedField(
- queryset=Document.objects.all(),
- required=True,
- label="Document",
- write_only=True,
- )
from django.contrib.auth.models import User
from rest_framework.test import APITestCase
-from documents.serialisers import DocumentSerializer
from documents.tests.utils import DirectoriesMixin
from paperless.models import CustomField
from paperless.models import Document
+from paperless.serialisers import DocumentSerializer
class DocumentWrapper:
+from __future__ import annotations
+
+import datetime
import logging
+import math
+import re
+import zoneinfo
+from decimal import Decimal
+from typing import TYPE_CHECKING
+
+import magic
+from celery import states
+from django.conf import settings
+from django.contrib.auth.models import Group
+from django.contrib.auth.models import User
+from django.contrib.contenttypes.models import ContentType
+from django.core.validators import DecimalValidator
+from django.core.validators import MaxLengthValidator
+from django.core.validators import RegexValidator
+from django.core.validators import integer_validator
+from django.utils.crypto import get_random_string
+from django.utils.text import slugify
+from django.utils.translation import gettext as _
+from drf_spectacular.utils import extend_schema_field
+from drf_writable_nested.serializers import NestedUpdateMixin
+from guardian.core import ObjectPermissionChecker
+from guardian.shortcuts import get_users_with_perms
+from guardian.utils import get_group_obj_perms_model
+from guardian.utils import get_user_obj_perms_model
+from rest_framework import fields
+from rest_framework import serializers
+from rest_framework.fields import SerializerMethodField
+
+if settings.AUDIT_LOG_ENABLED:
+ from auditlog.context import set_actor
+
+
+from documents import bulk_edit
+from documents.parsers import is_mime_type_supported
+from documents.permissions import get_groups_with_only_permission
+from documents.permissions import set_permissions_for_object
+from documents.templating.filepath import validate_filepath_template_and_render
+from documents.templating.utils import convert_format_str_to_template_format
+from paperless.data_models import DocumentSource
+from paperless.models import Correspondent
+from paperless.models import CustomField
+from paperless.models import CustomFieldInstance
+from paperless.models import Document
+from paperless.models import DocumentType
+from paperless.models import MatchingModel
+from paperless.models import Note
+from paperless.models import PaperlessTask
+from paperless.models import SavedView
+from paperless.models import SavedViewFilterRule
+from paperless.models import ShareLink
+from paperless.models import StoragePath
+from paperless.models import Tag
+from paperless.models import UiSettings
+from paperless.models import Workflow
+from paperless.models import WorkflowAction
+from paperless.models import WorkflowActionEmail
+from paperless.models import WorkflowActionWebhook
+from paperless.models import WorkflowTrigger
+from paperless.validators import uri_validator
+from paperless.validators import url_validator
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
from allauth.mfa.adapter import get_adapter as get_mfa_adapter
from allauth.mfa.models import Authenticator
from allauth.mfa.totp.internal.auth import TOTP
from allauth.socialaccount.models import SocialAccount
-from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
-from django.contrib.auth.models import User
-from rest_framework import serializers
from rest_framework.authtoken.serializers import AuthTokenSerializer
from paperless.models import ApplicationConfiguration
-from paperless_mail.serialisers import ObfuscatedPasswordField
-logger = logging.getLogger("paperless.settings")
+logger = logging.getLogger("paperless.serializers")
+
+
+class ObfuscatedPasswordField(serializers.CharField):
+ """
+ Sends *** string instead of password in the clear
+ """
+
+ def to_representation(self, value) -> str:
+ return "*" * max(10, len(value))
+
+ def to_internal_value(self, data):
+ return data
class PaperlessAuthTokenSerializer(AuthTokenSerializer):
class Meta:
model = ApplicationConfiguration
fields = "__all__"
+
+
+# https://www.django-rest-framework.org/api-guide/serializers/#example
+class DynamicFieldsModelSerializer(serializers.ModelSerializer):
+ """
+ A ModelSerializer that takes an additional `fields` argument that
+ controls which fields should be displayed.
+ """
+
+ def __init__(self, *args, **kwargs):
+ # Don't pass the 'fields' arg up to the superclass
+ fields = kwargs.pop("fields", None)
+
+ # Instantiate the superclass normally
+ super().__init__(*args, **kwargs)
+
+ if fields is not None:
+ # Drop any fields that are not specified in the `fields` argument.
+ allowed = set(fields)
+ existing = set(self.fields)
+ for field_name in existing - allowed:
+ self.fields.pop(field_name)
+
+
+class MatchingModelSerializer(serializers.ModelSerializer):
+ document_count = serializers.IntegerField(read_only=True)
+
+ def get_slug(self, obj) -> str:
+ return slugify(obj.name)
+
+ slug = SerializerMethodField()
+
+ def validate(self, data):
+ # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
+ name = data.get(
+ "name",
+ self.instance.name if hasattr(self.instance, "name") else None,
+ )
+ owner = (
+ data["owner"]
+ if "owner" in data
+ else self.user
+ if hasattr(self, "user")
+ else None
+ )
+ pk = self.instance.pk if hasattr(self.instance, "pk") else None
+ if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
+ name=name,
+ owner=owner,
+ ).exclude(pk=pk).exists():
+ raise serializers.ValidationError(
+ {"error": "Object violates owner / name unique constraint"},
+ )
+ return data
+
+ def validate_match(self, match):
+ if (
+ "matching_algorithm" in self.initial_data
+ and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
+ ):
+ try:
+ re.compile(match)
+ except re.error as e:
+ raise serializers.ValidationError(
+ _("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
+ )
+ return match
+
+
+class SetPermissionsMixin:
+ def _validate_user_ids(self, user_ids):
+ users = User.objects.none()
+ if user_ids is not None:
+ users = User.objects.filter(id__in=user_ids)
+ if not users.count() == len(user_ids):
+ raise serializers.ValidationError(
+ "Some users in don't exist or were specified twice.",
+ )
+ return users
+
+ def _validate_group_ids(self, group_ids):
+ groups = Group.objects.none()
+ if group_ids is not None:
+ groups = Group.objects.filter(id__in=group_ids)
+ if not groups.count() == len(group_ids):
+ raise serializers.ValidationError(
+ "Some groups in don't exist or were specified twice.",
+ )
+ return groups
+
+ def validate_set_permissions(self, set_permissions=None):
+ permissions_dict = {
+ "view": {},
+ "change": {},
+ }
+ if set_permissions is not None:
+ for action in ["view", "change"]:
+ if action in set_permissions:
+ if "users" in set_permissions[action]:
+ users = set_permissions[action]["users"]
+ permissions_dict[action]["users"] = self._validate_user_ids(
+ users,
+ )
+ if "groups" in set_permissions[action]:
+ groups = set_permissions[action]["groups"]
+ permissions_dict[action]["groups"] = self._validate_group_ids(
+ groups,
+ )
+ else:
+ del permissions_dict[action]
+ return permissions_dict
+
+ def _set_permissions(self, permissions, object):
+ set_permissions_for_object(permissions, object)
+
+
+class SerializerWithPerms(serializers.Serializer):
+ def __init__(self, *args, **kwargs):
+ self.user = kwargs.pop("user", None)
+ self.full_perms = kwargs.pop("full_perms", False)
+ self.all_fields = kwargs.pop("all_fields", False)
+ super().__init__(*args, **kwargs)
+
+
+@extend_schema_field(
+ field={
+ "type": "object",
+ "properties": {
+ "view": {
+ "type": "object",
+ "properties": {
+ "users": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ "groups": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ },
+ },
+ "change": {
+ "type": "object",
+ "properties": {
+ "users": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ "groups": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ },
+ },
+ },
+ },
+)
+class SetPermissionsSerializer(serializers.DictField):
+ pass
+
+
+class OwnedObjectSerializer(
+ SerializerWithPerms,
+ serializers.ModelSerializer,
+ SetPermissionsMixin,
+):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ if not self.all_fields:
+ try:
+ if self.full_perms:
+ self.fields.pop("user_can_change")
+ self.fields.pop("is_shared_by_requester")
+ else:
+ self.fields.pop("permissions")
+ except KeyError:
+ pass
+
+ @extend_schema_field(
+ field={
+ "type": "object",
+ "properties": {
+ "view": {
+ "type": "object",
+ "properties": {
+ "users": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ "groups": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ },
+ },
+ "change": {
+ "type": "object",
+ "properties": {
+ "users": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ "groups": {
+ "type": "array",
+ "items": {"type": "integer"},
+ },
+ },
+ },
+ },
+ },
+ )
+ def get_permissions(self, obj) -> dict:
+ view_codename = f"view_{obj.__class__.__name__.lower()}"
+ change_codename = f"change_{obj.__class__.__name__.lower()}"
+
+ return {
+ "view": {
+ "users": get_users_with_perms(
+ obj,
+ only_with_perms_in=[view_codename],
+ with_group_users=False,
+ ).values_list("id", flat=True),
+ "groups": get_groups_with_only_permission(
+ obj,
+ codename=view_codename,
+ ).values_list("id", flat=True),
+ },
+ "change": {
+ "users": get_users_with_perms(
+ obj,
+ only_with_perms_in=[change_codename],
+ with_group_users=False,
+ ).values_list("id", flat=True),
+ "groups": get_groups_with_only_permission(
+ obj,
+ codename=change_codename,
+ ).values_list("id", flat=True),
+ },
+ }
+
+ def get_user_can_change(self, obj) -> bool:
+ checker = ObjectPermissionChecker(self.user) if self.user is not None else None
+ return (
+ obj.owner is None
+ or obj.owner == self.user
+ or (
+ self.user is not None
+ and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
+ )
+ )
+
+ @staticmethod
+ def get_shared_object_pks(objects: Iterable):
+ """
+ Return the primary keys of the subset of objects that are shared.
+ """
+ try:
+ first_obj = next(iter(objects))
+ except StopIteration:
+ return set()
+
+ ctype = ContentType.objects.get_for_model(first_obj)
+ object_pks = list(obj.pk for obj in objects)
+ pk_type = type(first_obj.pk)
+
+ def get_pks_for_permission_type(model):
+ return map(
+ pk_type, # coerce the pk to be the same type of the provided objects
+ model.objects.filter(
+ content_type=ctype,
+ object_pk__in=object_pks,
+ )
+ .values_list("object_pk", flat=True)
+ .distinct(),
+ )
+
+ UserObjectPermission = get_user_obj_perms_model()
+ GroupObjectPermission = get_group_obj_perms_model()
+ user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
+ group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
+
+ return set(user_permission_pks) | set(group_permission_pks)
+
+ def get_is_shared_by_requester(self, obj: Document) -> bool:
+ # First check the context to see if `shared_object_pks` is set by the parent.
+ shared_object_pks = self.context.get("shared_object_pks")
+ # If not just check if the current object is shared.
+ if shared_object_pks is None:
+ shared_object_pks = self.get_shared_object_pks([obj])
+ return obj.owner == self.user and obj.id in shared_object_pks
+
+ permissions = SerializerMethodField(read_only=True)
+ user_can_change = SerializerMethodField(read_only=True)
+ is_shared_by_requester = SerializerMethodField(read_only=True)
+
+ set_permissions = SetPermissionsSerializer(
+ label="Set permissions",
+ allow_empty=True,
+ required=False,
+ write_only=True,
+ )
+ # other methods in mixin
+
+ def validate_unique_together(self, validated_data, instance=None):
+ # workaround for https://github.com/encode/django-rest-framework/issues/9358
+ if "owner" in validated_data and "name" in self.Meta.fields:
+ name = validated_data.get("name", instance.name if instance else None)
+ objects = (
+ self.Meta.model.objects.exclude(pk=instance.pk)
+ if instance
+ else self.Meta.model.objects.all()
+ )
+ not_unique = objects.filter(
+ owner=validated_data["owner"],
+ name=name,
+ ).exists()
+ if not_unique:
+ raise serializers.ValidationError(
+ {"error": "Object violates owner / name unique constraint"},
+ )
+
+ def create(self, validated_data):
+ # default to current user if not set
+ request = self.context.get("request")
+ if (
+ "owner" not in validated_data
+ or (request is not None and "owner" not in request.data)
+ ) and self.user:
+ validated_data["owner"] = self.user
+ permissions = None
+ if "set_permissions" in validated_data:
+ permissions = validated_data.pop("set_permissions")
+ self.validate_unique_together(validated_data)
+ instance = super().create(validated_data)
+ if permissions is not None:
+ self._set_permissions(permissions, instance)
+ return instance
+
+ def update(self, instance, validated_data):
+ if "set_permissions" in validated_data:
+ self._set_permissions(validated_data["set_permissions"], instance)
+ self.validate_unique_together(validated_data, instance)
+ return super().update(instance, validated_data)
+
+
+class OwnedObjectListSerializer(serializers.ListSerializer):
+ def to_representation(self, documents):
+ self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
+ documents,
+ )
+ return super().to_representation(documents)
+
+
+class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+ last_correspondence = serializers.DateTimeField(read_only=True, required=False)
+
+ class Meta:
+ model = Correspondent
+ fields = (
+ "id",
+ "slug",
+ "name",
+ "match",
+ "matching_algorithm",
+ "is_insensitive",
+ "document_count",
+ "last_correspondence",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ )
+
+
+class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+ class Meta:
+ model = DocumentType
+ fields = (
+ "id",
+ "slug",
+ "name",
+ "match",
+ "matching_algorithm",
+ "is_insensitive",
+ "document_count",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ )
+
+
+class DeprecatedColors:
+ COLOURS = (
+ (1, "#a6cee3"),
+ (2, "#1f78b4"),
+ (3, "#b2df8a"),
+ (4, "#33a02c"),
+ (5, "#fb9a99"),
+ (6, "#e31a1c"),
+ (7, "#fdbf6f"),
+ (8, "#ff7f00"),
+ (9, "#cab2d6"),
+ (10, "#6a3d9a"),
+ (11, "#b15928"),
+ (12, "#000000"),
+ (13, "#cccccc"),
+ )
+
+
+@extend_schema_field(
+ serializers.ChoiceField(
+ choices=DeprecatedColors.COLOURS,
+ ),
+)
+class ColorField(serializers.Field):
+ def to_internal_value(self, data):
+ for id, color in DeprecatedColors.COLOURS:
+ if id == data:
+ return color
+ raise serializers.ValidationError
+
+ def to_representation(self, value):
+ for id, color in DeprecatedColors.COLOURS:
+ if color == value:
+ return id
+ return 1
+
+
+class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
+ colour = ColorField(source="color", default="#a6cee3")
+
+ class Meta:
+ model = Tag
+ fields = (
+ "id",
+ "slug",
+ "name",
+ "colour",
+ "match",
+ "matching_algorithm",
+ "is_insensitive",
+ "is_inbox_tag",
+ "document_count",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ )
+
+
+class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+ def get_text_color(self, obj) -> str:
+ try:
+ h = obj.color.lstrip("#")
+ rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
+ luminance = math.sqrt(
+ 0.299 * math.pow(rgb[0], 2)
+ + 0.587 * math.pow(rgb[1], 2)
+ + 0.114 * math.pow(rgb[2], 2),
+ )
+ return "#ffffff" if luminance < 0.53 else "#000000"
+ except ValueError:
+ return "#000000"
+
+ text_color = serializers.SerializerMethodField()
+
+ class Meta:
+ model = Tag
+ fields = (
+ "id",
+ "slug",
+ "name",
+ "color",
+ "text_color",
+ "match",
+ "matching_algorithm",
+ "is_insensitive",
+ "is_inbox_tag",
+ "document_count",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ )
+
+ def validate_color(self, color):
+ regex = r"#[0-9a-fA-F]{6}"
+ if not re.match(regex, color):
+ raise serializers.ValidationError(_("Invalid color."))
+ return color
+
+
+class CorrespondentField(serializers.PrimaryKeyRelatedField):
+ def get_queryset(self):
+ return Correspondent.objects.all()
+
+
+class TagsField(serializers.PrimaryKeyRelatedField):
+ def get_queryset(self):
+ return Tag.objects.all()
+
+
+class DocumentTypeField(serializers.PrimaryKeyRelatedField):
+ def get_queryset(self):
+ return DocumentType.objects.all()
+
+
+class StoragePathField(serializers.PrimaryKeyRelatedField):
+ def get_queryset(self):
+ return StoragePath.objects.all()
+
+
+class CustomFieldSerializer(serializers.ModelSerializer):
+ def __init__(self, *args, **kwargs):
+ context = kwargs.get("context")
+ self.api_version = int(
+ context.get("request").version
+ if context and context.get("request")
+ else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
+ )
+ super().__init__(*args, **kwargs)
+
+ data_type = serializers.ChoiceField(
+ choices=CustomField.FieldDataType,
+ read_only=False,
+ )
+
+ document_count = serializers.IntegerField(read_only=True)
+
+ class Meta:
+ model = CustomField
+ fields = [
+ "id",
+ "name",
+ "data_type",
+ "extra_data",
+ "document_count",
+ ]
+
+ def validate(self, attrs):
+ # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
+ name = attrs.get(
+ "name",
+ self.instance.name if hasattr(self.instance, "name") else None,
+ )
+ objects = (
+ self.Meta.model.objects.exclude(
+ pk=self.instance.pk,
+ )
+ if self.instance is not None
+ else self.Meta.model.objects.all()
+ )
+ if ("name" in attrs) and objects.filter(
+ name=name,
+ ).exists():
+ raise serializers.ValidationError(
+ {"error": "Object violates name unique constraint"},
+ )
+ if (
+ "data_type" in attrs
+ and attrs["data_type"] == CustomField.FieldDataType.SELECT
+ ) or (
+ self.instance
+ and self.instance.data_type == CustomField.FieldDataType.SELECT
+ ):
+ if (
+ "extra_data" not in attrs
+ or "select_options" not in attrs["extra_data"]
+ or not isinstance(attrs["extra_data"]["select_options"], list)
+ or len(attrs["extra_data"]["select_options"]) == 0
+ or not all(
+ len(option.get("label", "")) > 0
+ for option in attrs["extra_data"]["select_options"]
+ )
+ ):
+ raise serializers.ValidationError(
+ {"error": "extra_data.select_options must be a valid list"},
+ )
+ # labels are valid, generate ids if not present
+ for option in attrs["extra_data"]["select_options"]:
+ if option.get("id") is None:
+ option["id"] = get_random_string(length=16)
+ elif (
+ "data_type" in attrs
+ and attrs["data_type"] == CustomField.FieldDataType.MONETARY
+ and "extra_data" in attrs
+ and "default_currency" in attrs["extra_data"]
+ and attrs["extra_data"]["default_currency"] is not None
+ and (
+ not isinstance(attrs["extra_data"]["default_currency"], str)
+ or (
+ len(attrs["extra_data"]["default_currency"]) > 0
+ and len(attrs["extra_data"]["default_currency"]) != 3
+ )
+ )
+ ):
+ raise serializers.ValidationError(
+ {"error": "extra_data.default_currency must be a 3-character string"},
+ )
+ return super().validate(attrs)
+
+ def to_internal_value(self, data):
+ ret = super().to_internal_value(data)
+
+ if (
+ self.api_version < 7
+ and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
+ and isinstance(ret.get("extra_data", {}).get("select_options"), list)
+ ):
+ ret["extra_data"]["select_options"] = [
+ {
+ "label": option,
+ "id": get_random_string(length=16),
+ }
+ for option in ret["extra_data"]["select_options"]
+ ]
+
+ return ret
+
+ def to_representation(self, instance):
+ ret = super().to_representation(instance)
+
+ if (
+ self.api_version < 7
+ and instance.data_type == CustomField.FieldDataType.SELECT
+ ):
+ # Convert the select options with ids to a list of strings
+ ret["extra_data"]["select_options"] = [
+ option["label"] for option in ret["extra_data"]["select_options"]
+ ]
+
+ return ret
+
+
+class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
+ """
+ Based on https://stackoverflow.com/a/62579804
+ """
+
+ def __init__(self, method_name=None, *args, **kwargs):
+ self.method_name = method_name
+ kwargs["source"] = "*"
+ super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
+
+ def to_internal_value(self, data):
+ return {self.field_name: data}
+
+
+class CustomFieldInstanceSerializer(serializers.ModelSerializer):
+ field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
+ value = ReadWriteSerializerMethodField(allow_null=True)
+
+ def create(self, validated_data):
+ # An instance is attached to a document
+ document: Document = validated_data["document"]
+ # And to a CustomField
+ custom_field: CustomField = validated_data["field"]
+ # This key must exist, as it is validated
+ data_store_name = CustomFieldInstance.get_value_field_name(
+ custom_field.data_type,
+ )
+
+ if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
+ # prior to update so we can look for any docs that are going to be removed
+ bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
+
+ # Actually update or create the instance, providing the value
+ # to fill in the correct attribute based on the type
+ instance, _ = CustomFieldInstance.objects.update_or_create(
+ document=document,
+ field=custom_field,
+ defaults={data_store_name: validated_data["value"]},
+ )
+ return instance
+
+ def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
+ return obj.value
+
+ def validate(self, data):
+ """
+ Probably because we're kind of doing it odd, validation from the model
+ doesn't run against the field "value", so we have to re-create it here.
+
+ Don't like it, but it is better than returning an HTTP 500 when the database
+ hates the value
+ """
+ data = super().validate(data)
+ field: CustomField = data["field"]
+ if "value" in data and data["value"] is not None:
+ if (
+ field.data_type == CustomField.FieldDataType.URL
+ and len(data["value"]) > 0
+ ):
+ uri_validator(data["value"])
+ elif field.data_type == CustomField.FieldDataType.INT:
+ integer_validator(data["value"])
+ elif (
+ field.data_type == CustomField.FieldDataType.MONETARY
+ and data["value"] != ""
+ ):
+ try:
+ # First try to validate as a number from legacy format
+ DecimalValidator(max_digits=12, decimal_places=2)(
+ Decimal(str(data["value"])),
+ )
+ except Exception:
+ # If that fails, try to validate as a monetary string
+ RegexValidator(
+ regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
+ message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
+ )(data["value"])
+ elif field.data_type == CustomField.FieldDataType.STRING:
+ MaxLengthValidator(limit_value=128)(data["value"])
+ elif field.data_type == CustomField.FieldDataType.SELECT:
+ select_options = field.extra_data["select_options"]
+ try:
+ next(
+ option
+ for option in select_options
+ if option["id"] == data["value"]
+ )
+ except Exception:
+ raise serializers.ValidationError(
+ f"Value must be an id of an element in {select_options}",
+ )
+ elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
+ if not (isinstance(data["value"], list) or data["value"] is None):
+ raise serializers.ValidationError(
+ "Value must be a list",
+ )
+ doc_ids = data["value"]
+ if Document.objects.filter(id__in=doc_ids).count() != len(
+ data["value"],
+ ):
+ raise serializers.ValidationError(
+ "Some documents in value don't exist or were specified twice.",
+ )
+
+ return data
+
+ def get_api_version(self):
+ return int(
+ self.context.get("request").version
+ if self.context.get("request")
+ else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
+ )
+
+ def to_internal_value(self, data):
+ ret = super().to_internal_value(data)
+
+ if (
+ self.get_api_version() < 7
+ and ret.get("field").data_type == CustomField.FieldDataType.SELECT
+ and ret.get("value") is not None
+ ):
+ # Convert the index of the option in the field.extra_data["select_options"]
+ # list to the options unique id
+ ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
+ "id"
+ ]
+
+ return ret
+
+ def to_representation(self, instance):
+ ret = super().to_representation(instance)
+
+ if (
+ self.get_api_version() < 7
+ and instance.field.data_type == CustomField.FieldDataType.SELECT
+ ):
+ # return the index of the option in the field.extra_data["select_options"] list
+ ret["value"] = next(
+ (
+ idx
+ for idx, option in enumerate(
+ instance.field.extra_data["select_options"],
+ )
+ if option["id"] == instance.value
+ ),
+ None,
+ )
+
+ return ret
+
+ class Meta:
+ model = CustomFieldInstance
+ fields = [
+ "value",
+ "field",
+ ]
+
+
+class BasicUserSerializer(serializers.ModelSerializer):
+ # Different than paperless.serializers.UserSerializer
+ class Meta:
+ model = User
+ fields = ["id", "username", "first_name", "last_name"]
+
+
+class NotesSerializer(serializers.ModelSerializer):
+ user = BasicUserSerializer(read_only=True)
+
+ class Meta:
+ model = Note
+ fields = ["id", "note", "created", "user"]
+ ordering = ["-created"]
+
+
+class DocumentSerializer(
+ OwnedObjectSerializer,
+ NestedUpdateMixin,
+ DynamicFieldsModelSerializer,
+):
+ correspondent = CorrespondentField(allow_null=True)
+ tags = TagsField(many=True)
+ document_type = DocumentTypeField(allow_null=True)
+ storage_path = StoragePathField(allow_null=True)
+
+ original_file_name = SerializerMethodField()
+ archived_file_name = SerializerMethodField()
+ created_date = serializers.DateField(required=False)
+ page_count = SerializerMethodField()
+
+ notes = NotesSerializer(many=True, required=False, read_only=True)
+
+ custom_fields = CustomFieldInstanceSerializer(
+ many=True,
+ allow_null=False,
+ required=False,
+ )
+
+ owner = serializers.PrimaryKeyRelatedField(
+ queryset=User.objects.all(),
+ required=False,
+ allow_null=True,
+ )
+
+ remove_inbox_tags = serializers.BooleanField(
+ default=False,
+ write_only=True,
+ allow_null=True,
+ required=False,
+ )
+
+ def get_page_count(self, obj) -> int | None:
+ return obj.page_count
+
+ def get_original_file_name(self, obj) -> str | None:
+ return obj.original_filename
+
+ def get_archived_file_name(self, obj) -> str | None:
+ if obj.has_archive_version:
+ return obj.get_public_filename(archive=True)
+ else:
+ return None
+
+ def to_representation(self, instance):
+ doc = super().to_representation(instance)
+ if self.truncate_content and "content" in self.fields:
+ doc["content"] = doc.get("content")[0:550]
+ return doc
+
+ def validate(self, attrs):
+ if (
+ "archive_serial_number" in attrs
+ and attrs["archive_serial_number"] is not None
+ and len(str(attrs["archive_serial_number"])) > 0
+ and Document.deleted_objects.filter(
+ archive_serial_number=attrs["archive_serial_number"],
+ ).exists()
+ ):
+ raise serializers.ValidationError(
+ {
+ "archive_serial_number": [
+ "Document with this Archive Serial Number already exists in the trash.",
+ ],
+ },
+ )
+ return super().validate(attrs)
+
+ def update(self, instance: Document, validated_data):
+ if "created_date" in validated_data and "created" not in validated_data:
+ new_datetime = datetime.datetime.combine(
+ validated_data.get("created_date"),
+ datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
+ )
+ instance.created = new_datetime
+ instance.save()
+ if "created_date" in validated_data:
+ validated_data.pop("created_date")
+ if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
+ incoming_custom_fields = [
+ field["field"] for field in validated_data["custom_fields"]
+ ]
+ for custom_field_instance in instance.custom_fields.filter(
+ field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
+ ):
+ if (
+ custom_field_instance.field not in incoming_custom_fields
+ and custom_field_instance.value is not None
+ ):
+ # Doc link field is being removed entirely
+ for doc_id in custom_field_instance.value:
+ bulk_edit.remove_doclink(
+ instance,
+ custom_field_instance.field,
+ doc_id,
+ )
+ if validated_data.get("remove_inbox_tags"):
+ tag_ids_being_added = (
+ [
+ tag.id
+ for tag in validated_data["tags"]
+ if tag not in instance.tags.all()
+ ]
+ if "tags" in validated_data
+ else []
+ )
+ inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
+ id__in=tag_ids_being_added,
+ )
+ if "tags" in validated_data:
+ validated_data["tags"] = [
+ tag
+ for tag in validated_data["tags"]
+ if tag not in inbox_tags_not_being_added
+ ]
+ else:
+ validated_data["tags"] = [
+ tag
+ for tag in instance.tags.all()
+ if tag not in inbox_tags_not_being_added
+ ]
+ if settings.AUDIT_LOG_ENABLED:
+ with set_actor(self.user):
+ super().update(instance, validated_data)
+ else:
+ super().update(instance, validated_data)
+ # hard delete custom field instances that were soft deleted
+ CustomFieldInstance.deleted_objects.filter(document=instance).delete()
+ return instance
+
+ def __init__(self, *args, **kwargs):
+ self.truncate_content = kwargs.pop("truncate_content", False)
+
+ # return full permissions if we're doing a PATCH or PUT
+ context = kwargs.get("context")
+ if context is not None and (
+ context.get("request").method == "PATCH"
+ or context.get("request").method == "PUT"
+ ):
+ kwargs["full_perms"] = True
+
+ super().__init__(*args, **kwargs)
+
+ class Meta:
+ model = Document
+ fields = (
+ "id",
+ "correspondent",
+ "document_type",
+ "storage_path",
+ "title",
+ "content",
+ "tags",
+ "created",
+ "created_date",
+ "modified",
+ "added",
+ "deleted_at",
+ "archive_serial_number",
+ "original_file_name",
+ "archived_file_name",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "is_shared_by_requester",
+ "set_permissions",
+ "notes",
+ "custom_fields",
+ "remove_inbox_tags",
+ "page_count",
+ "mime_type",
+ )
+ list_serializer_class = OwnedObjectListSerializer
+
+
+class SearchResultListSerializer(serializers.ListSerializer):
+ def to_representation(self, hits):
+ document_ids = [hit["id"] for hit in hits]
+ # Fetch all Document objects in the list in one SQL query.
+ documents = self.child.fetch_documents(document_ids)
+ self.child.context["documents"] = documents
+ # Also check if they are shared with other users / groups.
+ self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
+ documents.values(),
+ )
+
+ return super().to_representation(hits)
+
+
+class SearchResultSerializer(DocumentSerializer):
+ @staticmethod
+ def fetch_documents(ids):
+ """
+ Return a dict that maps given document IDs to Document objects.
+ """
+ return {
+ document.id: document
+ for document in Document.objects.select_related(
+ "correspondent",
+ "storage_path",
+ "document_type",
+ "owner",
+ )
+ .prefetch_related("tags", "custom_fields", "notes")
+ .filter(id__in=ids)
+ }
+
+ def to_representation(self, hit):
+ # Again we first check if the parent has already fetched the documents.
+ documents = self.context.get("documents")
+ # Otherwise we fetch this document.
+ if documents is None: # pragma: no cover
+ # In practice we only serialize **lists** of whoosh.searching.Hit.
+ # I'm keeping this check for completeness but marking it no cover for now.
+ documents = self.fetch_documents([hit["id"]])
+ document = documents[hit["id"]]
+
+ notes = ",".join(
+ [str(c.note) for c in document.notes.all()],
+ )
+ r = super().to_representation(document)
+ r["__search_hit__"] = {
+ "score": hit.score,
+ "highlights": hit.highlights("content", text=document.content),
+ "note_highlights": (
+ hit.highlights("notes", text=notes) if document else None
+ ),
+ "rank": hit.rank,
+ }
+
+ return r
+
+ class Meta(DocumentSerializer.Meta):
+ list_serializer_class = SearchResultListSerializer
+
+
+class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
+ class Meta:
+ model = SavedViewFilterRule
+ fields = ["rule_type", "value"]
+
+
+class SavedViewSerializer(OwnedObjectSerializer):
+ filter_rules = SavedViewFilterRuleSerializer(many=True)
+
+ class Meta:
+ model = SavedView
+ fields = [
+ "id",
+ "name",
+ "show_on_dashboard",
+ "show_in_sidebar",
+ "sort_field",
+ "sort_reverse",
+ "filter_rules",
+ "page_size",
+ "display_mode",
+ "display_fields",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ ]
+
+ def validate(self, attrs):
+ attrs = super().validate(attrs)
+ if "display_fields" in attrs and attrs["display_fields"] is not None:
+ for field in attrs["display_fields"]:
+ if (
+ SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
+ ): # i.e. check for 'custom_field_' prefix
+ field_id = int(re.search(r"\d+", field)[0])
+ if not CustomField.objects.filter(id=field_id).exists():
+ raise serializers.ValidationError(
+ f"Invalid field: {field}",
+ )
+ elif field not in SavedView.DisplayFields.values:
+ raise serializers.ValidationError(
+ f"Invalid field: {field}",
+ )
+ return attrs
+
+ def update(self, instance, validated_data):
+ if "filter_rules" in validated_data:
+ rules_data = validated_data.pop("filter_rules")
+ else:
+ rules_data = None
+ if "user" in validated_data:
+ # backwards compatibility
+ validated_data["owner"] = validated_data.pop("user")
+ if (
+ "display_fields" in validated_data
+ and isinstance(
+ validated_data["display_fields"],
+ list,
+ )
+ and len(validated_data["display_fields"]) == 0
+ ):
+ validated_data["display_fields"] = None
+ super().update(instance, validated_data)
+ if rules_data is not None:
+ SavedViewFilterRule.objects.filter(saved_view=instance).delete()
+ for rule_data in rules_data:
+ SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
+ return instance
+
+ def create(self, validated_data):
+ rules_data = validated_data.pop("filter_rules")
+ if "user" in validated_data:
+ # backwards compatibility
+ validated_data["owner"] = validated_data.pop("user")
+ saved_view = SavedView.objects.create(**validated_data)
+ for rule_data in rules_data:
+ SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
+ return saved_view
+
+
+class DocumentListSerializer(serializers.Serializer):
+ documents = serializers.ListField(
+ required=True,
+ label="Documents",
+ write_only=True,
+ child=serializers.IntegerField(),
+ )
+
+ def _validate_document_id_list(self, documents, name="documents"):
+ if not isinstance(documents, list):
+ raise serializers.ValidationError(f"{name} must be a list")
+ if not all(isinstance(i, int) for i in documents):
+ raise serializers.ValidationError(f"{name} must be a list of integers")
+ count = Document.objects.filter(id__in=documents).count()
+ if not count == len(documents):
+ raise serializers.ValidationError(
+ f"Some documents in {name} don't exist or were specified twice.",
+ )
+
+ def validate_documents(self, documents):
+ self._validate_document_id_list(documents)
+ return documents
+
+
+class BulkEditSerializer(
+ SerializerWithPerms,
+ DocumentListSerializer,
+ SetPermissionsMixin,
+):
+ method = serializers.ChoiceField(
+ choices=[
+ "set_correspondent",
+ "set_document_type",
+ "set_storage_path",
+ "add_tag",
+ "remove_tag",
+ "modify_tags",
+ "modify_custom_fields",
+ "delete",
+ "reprocess",
+ "set_permissions",
+ "rotate",
+ "merge",
+ "split",
+ "delete_pages",
+ ],
+ label="Method",
+ write_only=True,
+ )
+
+ parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
+
+ def _validate_tag_id_list(self, tags, name="tags"):
+ if not isinstance(tags, list):
+ raise serializers.ValidationError(f"{name} must be a list")
+ if not all(isinstance(i, int) for i in tags):
+ raise serializers.ValidationError(f"{name} must be a list of integers")
+ count = Tag.objects.filter(id__in=tags).count()
+ if not count == len(tags):
+ raise serializers.ValidationError(
+ f"Some tags in {name} don't exist or were specified twice.",
+ )
+
+ def _validate_custom_field_id_list_or_dict(
+ self,
+ custom_fields,
+ name="custom_fields",
+ ):
+ ids = custom_fields
+ if isinstance(custom_fields, dict):
+ try:
+ ids = [int(i[0]) for i in custom_fields.items()]
+ except Exception as e:
+ logger.exception(f"Error validating custom fields: {e}")
+ raise serializers.ValidationError(
+ f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
+ )
+ elif not isinstance(custom_fields, list) or not all(
+ isinstance(i, int) for i in ids
+ ):
+ raise serializers.ValidationError(
+ f"{name} must be a list of integers or a dict of id:value pairs",
+ )
+ count = CustomField.objects.filter(id__in=ids).count()
+ if not count == len(ids):
+ raise serializers.ValidationError(
+ f"Some custom fields in {name} don't exist or were specified twice.",
+ )
+
+ def validate_method(self, method):
+ if method == "set_correspondent":
+ return bulk_edit.set_correspondent
+ elif method == "set_document_type":
+ return bulk_edit.set_document_type
+ elif method == "set_storage_path":
+ return bulk_edit.set_storage_path
+ elif method == "add_tag":
+ return bulk_edit.add_tag
+ elif method == "remove_tag":
+ return bulk_edit.remove_tag
+ elif method == "modify_tags":
+ return bulk_edit.modify_tags
+ elif method == "modify_custom_fields":
+ return bulk_edit.modify_custom_fields
+ elif method == "delete":
+ return bulk_edit.delete
+ elif method == "redo_ocr" or method == "reprocess":
+ return bulk_edit.reprocess
+ elif method == "set_permissions":
+ return bulk_edit.set_permissions
+ elif method == "rotate":
+ return bulk_edit.rotate
+ elif method == "merge":
+ return bulk_edit.merge
+ elif method == "split":
+ return bulk_edit.split
+ elif method == "delete_pages":
+ return bulk_edit.delete_pages
+ else:
+ raise serializers.ValidationError("Unsupported method.")
+
+ def _validate_parameters_tags(self, parameters):
+ if "tag" in parameters:
+ tag_id = parameters["tag"]
+ try:
+ Tag.objects.get(id=tag_id)
+ except Tag.DoesNotExist:
+ raise serializers.ValidationError("Tag does not exist")
+ else:
+ raise serializers.ValidationError("tag not specified")
+
+ def _validate_parameters_document_type(self, parameters):
+ if "document_type" in parameters:
+ document_type_id = parameters["document_type"]
+ if document_type_id is None:
+ # None is ok
+ return
+ try:
+ DocumentType.objects.get(id=document_type_id)
+ except DocumentType.DoesNotExist:
+ raise serializers.ValidationError("Document type does not exist")
+ else:
+ raise serializers.ValidationError("document_type not specified")
+
+ def _validate_parameters_correspondent(self, parameters):
+ if "correspondent" in parameters:
+ correspondent_id = parameters["correspondent"]
+ if correspondent_id is None:
+ return
+ try:
+ Correspondent.objects.get(id=correspondent_id)
+ except Correspondent.DoesNotExist:
+ raise serializers.ValidationError("Correspondent does not exist")
+ else:
+ raise serializers.ValidationError("correspondent not specified")
+
+ def _validate_storage_path(self, parameters):
+ if "storage_path" in parameters:
+ storage_path_id = parameters["storage_path"]
+ if storage_path_id is None:
+ return
+ try:
+ StoragePath.objects.get(id=storage_path_id)
+ except StoragePath.DoesNotExist:
+ raise serializers.ValidationError(
+ "Storage path does not exist",
+ )
+ else:
+ raise serializers.ValidationError("storage path not specified")
+
+ def _validate_parameters_modify_tags(self, parameters):
+ if "add_tags" in parameters:
+ self._validate_tag_id_list(parameters["add_tags"], "add_tags")
+ else:
+ raise serializers.ValidationError("add_tags not specified")
+
+ if "remove_tags" in parameters:
+ self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
+ else:
+ raise serializers.ValidationError("remove_tags not specified")
+
+ def _validate_parameters_modify_custom_fields(self, parameters):
+ if "add_custom_fields" in parameters:
+ self._validate_custom_field_id_list_or_dict(
+ parameters["add_custom_fields"],
+ "add_custom_fields",
+ )
+ else:
+ raise serializers.ValidationError("add_custom_fields not specified")
+
+ if "remove_custom_fields" in parameters:
+ self._validate_custom_field_id_list_or_dict(
+ parameters["remove_custom_fields"],
+ "remove_custom_fields",
+ )
+ else:
+ raise serializers.ValidationError("remove_custom_fields not specified")
+
+ def _validate_owner(self, owner):
+ ownerUser = User.objects.get(pk=owner)
+ if ownerUser is None:
+ raise serializers.ValidationError("Specified owner cannot be found")
+ return ownerUser
+
+ def _validate_parameters_set_permissions(self, parameters):
+ parameters["set_permissions"] = self.validate_set_permissions(
+ parameters["set_permissions"],
+ )
+ if "owner" in parameters and parameters["owner"] is not None:
+ self._validate_owner(parameters["owner"])
+ if "merge" not in parameters:
+ parameters["merge"] = False
+
+ def _validate_parameters_rotate(self, parameters):
+ try:
+ if (
+ "degrees" not in parameters
+ or not float(parameters["degrees"]).is_integer()
+ ):
+ raise serializers.ValidationError("invalid rotation degrees")
+ except ValueError:
+ raise serializers.ValidationError("invalid rotation degrees")
+
+ def _validate_parameters_split(self, parameters):
+ if "pages" not in parameters:
+ raise serializers.ValidationError("pages not specified")
+ try:
+ pages = []
+ docs = parameters["pages"].split(",")
+ for doc in docs:
+ if "-" in doc:
+ pages.append(
+ [
+ x
+ for x in range(
+ int(doc.split("-")[0]),
+ int(doc.split("-")[1]) + 1,
+ )
+ ],
+ )
+ else:
+ pages.append([int(doc)])
+ parameters["pages"] = pages
+ except ValueError:
+ raise serializers.ValidationError("invalid pages specified")
+
+ if "delete_originals" in parameters:
+ if not isinstance(parameters["delete_originals"], bool):
+ raise serializers.ValidationError("delete_originals must be a boolean")
+ else:
+ parameters["delete_originals"] = False
+
+ def _validate_parameters_delete_pages(self, parameters):
+ if "pages" not in parameters:
+ raise serializers.ValidationError("pages not specified")
+ if not isinstance(parameters["pages"], list):
+ raise serializers.ValidationError("pages must be a list")
+ if not all(isinstance(i, int) for i in parameters["pages"]):
+ raise serializers.ValidationError("pages must be a list of integers")
+
+ def _validate_parameters_merge(self, parameters):
+ if "delete_originals" in parameters:
+ if not isinstance(parameters["delete_originals"], bool):
+ raise serializers.ValidationError("delete_originals must be a boolean")
+ else:
+ parameters["delete_originals"] = False
+ if "archive_fallback" in parameters:
+ if not isinstance(parameters["archive_fallback"], bool):
+ raise serializers.ValidationError("archive_fallback must be a boolean")
+ else:
+ parameters["archive_fallback"] = False
+
+ def validate(self, attrs):
+ method = attrs["method"]
+ parameters = attrs["parameters"]
+
+ if method == bulk_edit.set_correspondent:
+ self._validate_parameters_correspondent(parameters)
+ elif method == bulk_edit.set_document_type:
+ self._validate_parameters_document_type(parameters)
+ elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
+ self._validate_parameters_tags(parameters)
+ elif method == bulk_edit.modify_tags:
+ self._validate_parameters_modify_tags(parameters)
+ elif method == bulk_edit.set_storage_path:
+ self._validate_storage_path(parameters)
+ elif method == bulk_edit.modify_custom_fields:
+ self._validate_parameters_modify_custom_fields(parameters)
+ elif method == bulk_edit.set_permissions:
+ self._validate_parameters_set_permissions(parameters)
+ elif method == bulk_edit.rotate:
+ self._validate_parameters_rotate(parameters)
+ elif method == bulk_edit.split:
+ if len(attrs["documents"]) > 1:
+ raise serializers.ValidationError(
+ "Split method only supports one document",
+ )
+ self._validate_parameters_split(parameters)
+ elif method == bulk_edit.delete_pages:
+ if len(attrs["documents"]) > 1:
+ raise serializers.ValidationError(
+ "Delete pages method only supports one document",
+ )
+ self._validate_parameters_delete_pages(parameters)
+ elif method == bulk_edit.merge:
+ self._validate_parameters_merge(parameters)
+
+ return attrs
+
+
+class PostDocumentSerializer(serializers.Serializer):
+ created = serializers.DateTimeField(
+ label="Created",
+ allow_null=True,
+ write_only=True,
+ required=False,
+ )
+
+ document = serializers.FileField(
+ label="Document",
+ write_only=True,
+ )
+
+ title = serializers.CharField(
+ label="Title",
+ write_only=True,
+ required=False,
+ )
+
+ correspondent = serializers.PrimaryKeyRelatedField(
+ queryset=Correspondent.objects.all(),
+ label="Correspondent",
+ allow_null=True,
+ write_only=True,
+ required=False,
+ )
+
+ document_type = serializers.PrimaryKeyRelatedField(
+ queryset=DocumentType.objects.all(),
+ label="Document type",
+ allow_null=True,
+ write_only=True,
+ required=False,
+ )
+
+ storage_path = serializers.PrimaryKeyRelatedField(
+ queryset=StoragePath.objects.all(),
+ label="Storage path",
+ allow_null=True,
+ write_only=True,
+ required=False,
+ )
+
+ tags = serializers.PrimaryKeyRelatedField(
+ many=True,
+ queryset=Tag.objects.all(),
+ label="Tags",
+ write_only=True,
+ required=False,
+ )
+
+ archive_serial_number = serializers.IntegerField(
+ label="ASN",
+ write_only=True,
+ required=False,
+ min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
+ max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
+ )
+
+ custom_fields = serializers.PrimaryKeyRelatedField(
+ many=True,
+ queryset=CustomField.objects.all(),
+ label="Custom fields",
+ write_only=True,
+ required=False,
+ )
+
+ from_webui = serializers.BooleanField(
+ label="Documents are from Paperless-ngx WebUI",
+ write_only=True,
+ required=False,
+ )
+
+ def validate_document(self, document):
+ document_data = document.file.read()
+ mime_type = magic.from_buffer(document_data, mime=True)
+
+ if not is_mime_type_supported(mime_type):
+ if (
+ mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
+ and document.name.endswith(
+ ".pdf",
+ )
+ ):
+ # If the file is an invalid PDF, we can try to recover it later in the consumer
+ mime_type = "application/pdf"
+ else:
+ raise serializers.ValidationError(
+ _("File type %(type)s not supported") % {"type": mime_type},
+ )
+
+ return document.name, document_data
+
+ def validate_correspondent(self, correspondent):
+ if correspondent:
+ return correspondent.id
+ else:
+ return None
+
+ def validate_document_type(self, document_type):
+ if document_type:
+ return document_type.id
+ else:
+ return None
+
+ def validate_storage_path(self, storage_path):
+ if storage_path:
+ return storage_path.id
+ else:
+ return None
+
+ def validate_tags(self, tags):
+ if tags:
+ return [tag.id for tag in tags]
+ else:
+ return None
+
+ def validate_custom_fields(self, custom_fields):
+ if custom_fields:
+ return [custom_field.id for custom_field in custom_fields]
+ else:
+ return None
+
+
+class BulkDownloadSerializer(DocumentListSerializer):
+ content = serializers.ChoiceField(
+ choices=["archive", "originals", "both"],
+ default="archive",
+ )
+
+ compression = serializers.ChoiceField(
+ choices=["none", "deflated", "bzip2", "lzma"],
+ default="none",
+ )
+
+ follow_formatting = serializers.BooleanField(
+ default=False,
+ )
+
+ def validate_compression(self, compression):
+ import zipfile
+
+ return {
+ "none": zipfile.ZIP_STORED,
+ "deflated": zipfile.ZIP_DEFLATED,
+ "bzip2": zipfile.ZIP_BZIP2,
+ "lzma": zipfile.ZIP_LZMA,
+ }[compression]
+
+
+class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
+ class Meta:
+ model = StoragePath
+ fields = (
+ "id",
+ "slug",
+ "name",
+ "path",
+ "match",
+ "matching_algorithm",
+ "is_insensitive",
+ "document_count",
+ "owner",
+ "permissions",
+ "user_can_change",
+ "set_permissions",
+ )
+
+ def validate_path(self, path: str):
+ converted_path = convert_format_str_to_template_format(path)
+ if converted_path != path:
+ logger.warning(
+ f"Storage path {path} is not using the new style format, consider updating",
+ )
+ result = validate_filepath_template_and_render(converted_path)
+
+ if result is None:
+ raise serializers.ValidationError(_("Invalid variable detected."))
+
+ return converted_path
+
+ def update(self, instance, validated_data):
+ """
+ When a storage path is updated, see if documents
+ using it require a rename/move
+ """
+ doc_ids = [doc.id for doc in instance.documents.all()]
+ if len(doc_ids):
+ bulk_edit.bulk_update_documents.delay(doc_ids)
+
+ return super().update(instance, validated_data)
+
+
+class UiSettingsViewSerializer(serializers.ModelSerializer):
+ class Meta:
+ model = UiSettings
+ depth = 1
+ fields = [
+ "id",
+ "settings",
+ ]
+
+ def validate_settings(self, settings):
+ # we never save update checking backend setting
+ if "update_checking" in settings:
+ try:
+ settings["update_checking"].pop("backend_setting")
+ except KeyError:
+ pass
+ return settings
+
+ def create(self, validated_data):
+ ui_settings = UiSettings.objects.update_or_create(
+ user=validated_data.get("user"),
+ defaults={"settings": validated_data.get("settings", None)},
+ )
+ return ui_settings
+
+
+class TasksViewSerializer(OwnedObjectSerializer):
+ class Meta:
+ model = PaperlessTask
+ fields = (
+ "id",
+ "task_id",
+ "task_name",
+ "task_file_name",
+ "date_created",
+ "date_done",
+ "type",
+ "status",
+ "result",
+ "acknowledged",
+ "related_document",
+ "owner",
+ )
+
+ related_document = serializers.SerializerMethodField()
+ created_doc_re = re.compile(r"New document id (\d+) created")
+ duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
+
+ def get_related_document(self, obj) -> str | None:
+ result = None
+ re = None
+ if obj.result:
+ match obj.status:
+ case states.SUCCESS:
+ re = self.created_doc_re
+ case states.FAILURE:
+ re = (
+ self.duplicate_doc_re
+ if "existing document is in the trash" not in obj.result
+ else None
+ )
+ if re is not None:
+ try:
+ result = re.search(obj.result).group(1)
+ except Exception:
+ pass
+
+ return result
+
+
+class RunTaskViewSerializer(serializers.Serializer):
+ task_name = serializers.ChoiceField(
+ choices=PaperlessTask.TaskName.choices,
+ label="Task Name",
+ write_only=True,
+ )
+
+
+class AcknowledgeTasksViewSerializer(serializers.Serializer):
+ tasks = serializers.ListField(
+ required=True,
+ label="Tasks",
+ write_only=True,
+ child=serializers.IntegerField(),
+ )
+
+ def _validate_task_id_list(self, tasks, name="tasks"):
+ if not isinstance(tasks, list):
+ raise serializers.ValidationError(f"{name} must be a list")
+ if not all(isinstance(i, int) for i in tasks):
+ raise serializers.ValidationError(f"{name} must be a list of integers")
+ count = PaperlessTask.objects.filter(id__in=tasks).count()
+ if not count == len(tasks):
+ raise serializers.ValidationError(
+ f"Some tasks in {name} don't exist or were specified twice.",
+ )
+
+ def validate_tasks(self, tasks):
+ self._validate_task_id_list(tasks)
+ return tasks
+
+
+class ShareLinkSerializer(OwnedObjectSerializer):
+ class Meta:
+ model = ShareLink
+ fields = (
+ "id",
+ "created",
+ "expiration",
+ "slug",
+ "document",
+ "file_version",
+ )
+
+ def create(self, validated_data):
+ validated_data["slug"] = get_random_string(50)
+ return super().create(validated_data)
+
+
+class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
+ objects = serializers.ListField(
+ required=True,
+ allow_empty=False,
+ label="Objects",
+ write_only=True,
+ child=serializers.IntegerField(),
+ )
+
+ object_type = serializers.ChoiceField(
+ choices=[
+ "tags",
+ "correspondents",
+ "document_types",
+ "storage_paths",
+ ],
+ label="Object Type",
+ write_only=True,
+ )
+
+ operation = serializers.ChoiceField(
+ choices=[
+ "set_permissions",
+ "delete",
+ ],
+ label="Operation",
+ required=True,
+ write_only=True,
+ )
+
+ owner = serializers.PrimaryKeyRelatedField(
+ queryset=User.objects.all(),
+ required=False,
+ allow_null=True,
+ )
+
+ permissions = serializers.DictField(
+ label="Set permissions",
+ allow_empty=False,
+ required=False,
+ write_only=True,
+ )
+
+ merge = serializers.BooleanField(
+ default=False,
+ write_only=True,
+ required=False,
+ )
+
+ def get_object_class(self, object_type):
+ object_class = None
+ if object_type == "tags":
+ object_class = Tag
+ elif object_type == "correspondents":
+ object_class = Correspondent
+ elif object_type == "document_types":
+ object_class = DocumentType
+ elif object_type == "storage_paths":
+ object_class = StoragePath
+ return object_class
+
+ def _validate_objects(self, objects, object_type):
+ if not isinstance(objects, list):
+ raise serializers.ValidationError("objects must be a list")
+ if not all(isinstance(i, int) for i in objects):
+ raise serializers.ValidationError("objects must be a list of integers")
+ object_class = self.get_object_class(object_type)
+ count = object_class.objects.filter(id__in=objects).count()
+ if not count == len(objects):
+ raise serializers.ValidationError(
+ "Some ids in objects don't exist or were specified twice.",
+ )
+ return objects
+
+ def _validate_permissions(self, permissions):
+ self.validate_set_permissions(
+ permissions,
+ )
+
+ def validate(self, attrs):
+ object_type = attrs["object_type"]
+ objects = attrs["objects"]
+ operation = attrs.get("operation")
+
+ self._validate_objects(objects, object_type)
+
+ if operation == "set_permissions":
+ permissions = attrs.get("permissions")
+ if permissions is not None:
+ self._validate_permissions(permissions)
+
+ return attrs
+
+
+class WorkflowTriggerSerializer(serializers.ModelSerializer):
+ id = serializers.IntegerField(required=False, allow_null=True)
+ sources = fields.MultipleChoiceField(
+ choices=WorkflowTrigger.DocumentSourceChoices.choices,
+ allow_empty=True,
+ default={
+ DocumentSource.ConsumeFolder,
+ DocumentSource.ApiUpload,
+ DocumentSource.MailFetch,
+ },
+ )
+
+ type = serializers.ChoiceField(
+ choices=WorkflowTrigger.WorkflowTriggerType.choices,
+ label="Trigger Type",
+ )
+
+ class Meta:
+ model = WorkflowTrigger
+ fields = [
+ "id",
+ "sources",
+ "type",
+ "filter_path",
+ "filter_filename",
+ "filter_mailrule",
+ "matching_algorithm",
+ "match",
+ "is_insensitive",
+ "filter_has_tags",
+ "filter_has_correspondent",
+ "filter_has_document_type",
+ "schedule_offset_days",
+ "schedule_is_recurring",
+ "schedule_recurring_interval_days",
+ "schedule_date_field",
+ "schedule_date_custom_field",
+ ]
+
+ def validate(self, attrs):
+ # Empty strings treated as None to avoid unexpected behavior
+ if (
+ "filter_filename" in attrs
+ and attrs["filter_filename"] is not None
+ and len(attrs["filter_filename"]) == 0
+ ):
+ attrs["filter_filename"] = None
+ if (
+ "filter_path" in attrs
+ and attrs["filter_path"] is not None
+ and len(attrs["filter_path"]) == 0
+ ):
+ attrs["filter_path"] = None
+
+ if (
+ attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
+ and "filter_mailrule" not in attrs
+ and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
+ and ("filter_path" not in attrs or attrs["filter_path"] is None)
+ ):
+ raise serializers.ValidationError(
+ "File name, path or mail rule filter are required",
+ )
+
+ return attrs
+
+
+class WorkflowActionEmailSerializer(serializers.ModelSerializer):
+ id = serializers.IntegerField(allow_null=True, required=False)
+
+ class Meta:
+ model = WorkflowActionEmail
+ fields = [
+ "id",
+ "subject",
+ "body",
+ "to",
+ "include_document",
+ ]
+
+
+class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
+ id = serializers.IntegerField(allow_null=True, required=False)
+
+ def validate_url(self, url):
+ url_validator(url)
+ return url
+
+ class Meta:
+ model = WorkflowActionWebhook
+ fields = [
+ "id",
+ "url",
+ "use_params",
+ "as_json",
+ "params",
+ "body",
+ "headers",
+ "include_document",
+ ]
+
+
+class WorkflowActionSerializer(serializers.ModelSerializer):
+ id = serializers.IntegerField(required=False, allow_null=True)
+ assign_correspondent = CorrespondentField(allow_null=True, required=False)
+ assign_tags = TagsField(many=True, allow_null=True, required=False)
+ assign_document_type = DocumentTypeField(allow_null=True, required=False)
+ assign_storage_path = StoragePathField(allow_null=True, required=False)
+ email = WorkflowActionEmailSerializer(allow_null=True, required=False)
+ webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
+
+ class Meta:
+ model = WorkflowAction
+ fields = [
+ "id",
+ "type",
+ "assign_title",
+ "assign_tags",
+ "assign_correspondent",
+ "assign_document_type",
+ "assign_storage_path",
+ "assign_owner",
+ "assign_view_users",
+ "assign_view_groups",
+ "assign_change_users",
+ "assign_change_groups",
+ "assign_custom_fields",
+ "assign_custom_fields_values",
+ "remove_all_tags",
+ "remove_tags",
+ "remove_all_correspondents",
+ "remove_correspondents",
+ "remove_all_document_types",
+ "remove_document_types",
+ "remove_all_storage_paths",
+ "remove_storage_paths",
+ "remove_custom_fields",
+ "remove_all_custom_fields",
+ "remove_all_owners",
+ "remove_owners",
+ "remove_all_permissions",
+ "remove_view_users",
+ "remove_view_groups",
+ "remove_change_users",
+ "remove_change_groups",
+ "email",
+ "webhook",
+ ]
+
+ def validate(self, attrs):
+ if "assign_title" in attrs and attrs["assign_title"] is not None:
+ if len(attrs["assign_title"]) == 0:
+ # Empty strings treated as None to avoid unexpected behavior
+ attrs["assign_title"] = None
+ else:
+ try:
+ # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
+ attrs["assign_title"].format(
+ correspondent="",
+ document_type="",
+ added="",
+ added_year="",
+ added_year_short="",
+ added_month="",
+ added_month_name="",
+ added_month_name_short="",
+ added_day="",
+ added_time="",
+ owner_username="",
+ original_filename="",
+ filename="",
+ created="",
+ created_year="",
+ created_year_short="",
+ created_month="",
+ created_month_name="",
+ created_month_name_short="",
+ created_day="",
+ created_time="",
+ )
+ except (ValueError, KeyError) as e:
+ raise serializers.ValidationError(
+ {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
+ )
+
+ if (
+ "type" in attrs
+ and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
+ and "email" not in attrs
+ ):
+ raise serializers.ValidationError(
+ "Email data is required for email actions",
+ )
+
+ if (
+ "type" in attrs
+ and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
+ and "webhook" not in attrs
+ ):
+ raise serializers.ValidationError(
+ "Webhook data is required for webhook actions",
+ )
+
+ return attrs
+
+
+class WorkflowSerializer(serializers.ModelSerializer):
+ order = serializers.IntegerField(required=False)
+
+ triggers = WorkflowTriggerSerializer(many=True)
+ actions = WorkflowActionSerializer(many=True)
+
+ class Meta:
+ model = Workflow
+ fields = [
+ "id",
+ "name",
+ "order",
+ "enabled",
+ "triggers",
+ "actions",
+ ]
+
+ def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
+ set_triggers = []
+ set_actions = []
+
+ if triggers is not None:
+ for trigger in triggers:
+ filter_has_tags = trigger.pop("filter_has_tags", None)
+ trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
+ id=trigger.get("id"),
+ defaults=trigger,
+ )
+ if filter_has_tags is not None:
+ trigger_instance.filter_has_tags.set(filter_has_tags)
+ set_triggers.append(trigger_instance)
+
+ if actions is not None:
+ for action in actions:
+ assign_tags = action.pop("assign_tags", None)
+ assign_view_users = action.pop("assign_view_users", None)
+ assign_view_groups = action.pop("assign_view_groups", None)
+ assign_change_users = action.pop("assign_change_users", None)
+ assign_change_groups = action.pop("assign_change_groups", None)
+ assign_custom_fields = action.pop("assign_custom_fields", None)
+ remove_tags = action.pop("remove_tags", None)
+ remove_correspondents = action.pop("remove_correspondents", None)
+ remove_document_types = action.pop("remove_document_types", None)
+ remove_storage_paths = action.pop("remove_storage_paths", None)
+ remove_custom_fields = action.pop("remove_custom_fields", None)
+ remove_owners = action.pop("remove_owners", None)
+ remove_view_users = action.pop("remove_view_users", None)
+ remove_view_groups = action.pop("remove_view_groups", None)
+ remove_change_users = action.pop("remove_change_users", None)
+ remove_change_groups = action.pop("remove_change_groups", None)
+
+ email_data = action.pop("email", None)
+ webhook_data = action.pop("webhook", None)
+
+ action_instance, _ = WorkflowAction.objects.update_or_create(
+ id=action.get("id"),
+ defaults=action,
+ )
+
+ if email_data is not None:
+ serializer = WorkflowActionEmailSerializer(data=email_data)
+ serializer.is_valid(raise_exception=True)
+ email, _ = WorkflowActionEmail.objects.update_or_create(
+ id=email_data.get("id"),
+ defaults=serializer.validated_data,
+ )
+ action_instance.email = email
+ action_instance.save()
+
+ if webhook_data is not None:
+ serializer = WorkflowActionWebhookSerializer(data=webhook_data)
+ serializer.is_valid(raise_exception=True)
+ webhook, _ = WorkflowActionWebhook.objects.update_or_create(
+ id=webhook_data.get("id"),
+ defaults=serializer.validated_data,
+ )
+ action_instance.webhook = webhook
+ action_instance.save()
+
+ if assign_tags is not None:
+ action_instance.assign_tags.set(assign_tags)
+ if assign_view_users is not None:
+ action_instance.assign_view_users.set(assign_view_users)
+ if assign_view_groups is not None:
+ action_instance.assign_view_groups.set(assign_view_groups)
+ if assign_change_users is not None:
+ action_instance.assign_change_users.set(assign_change_users)
+ if assign_change_groups is not None:
+ action_instance.assign_change_groups.set(assign_change_groups)
+ if assign_custom_fields is not None:
+ action_instance.assign_custom_fields.set(assign_custom_fields)
+ if remove_tags is not None:
+ action_instance.remove_tags.set(remove_tags)
+ if remove_correspondents is not None:
+ action_instance.remove_correspondents.set(remove_correspondents)
+ if remove_document_types is not None:
+ action_instance.remove_document_types.set(remove_document_types)
+ if remove_storage_paths is not None:
+ action_instance.remove_storage_paths.set(remove_storage_paths)
+ if remove_custom_fields is not None:
+ action_instance.remove_custom_fields.set(remove_custom_fields)
+ if remove_owners is not None:
+ action_instance.remove_owners.set(remove_owners)
+ if remove_view_users is not None:
+ action_instance.remove_view_users.set(remove_view_users)
+ if remove_view_groups is not None:
+ action_instance.remove_view_groups.set(remove_view_groups)
+ if remove_change_users is not None:
+ action_instance.remove_change_users.set(remove_change_users)
+ if remove_change_groups is not None:
+ action_instance.remove_change_groups.set(remove_change_groups)
+
+ set_actions.append(action_instance)
+
+ instance.triggers.set(set_triggers)
+ instance.actions.set(set_actions)
+ instance.save()
+
+ def prune_triggers_and_actions(self):
+ """
+ ManyToMany fields dont support e.g. on_delete so we need to discard unattached
+ triggers and actionas manually
+ """
+ for trigger in WorkflowTrigger.objects.all():
+ if trigger.workflows.all().count() == 0:
+ trigger.delete()
+
+ for action in WorkflowAction.objects.all():
+ if action.workflows.all().count() == 0:
+ action.delete()
+
+ WorkflowActionEmail.objects.filter(action=None).delete()
+ WorkflowActionWebhook.objects.filter(action=None).delete()
+
+ def create(self, validated_data) -> Workflow:
+ if "triggers" in validated_data:
+ triggers = validated_data.pop("triggers")
+
+ if "actions" in validated_data:
+ actions = validated_data.pop("actions")
+
+ instance = super().create(validated_data)
+
+ self.update_triggers_and_actions(instance, triggers, actions)
+
+ return instance
+
+ def update(self, instance: Workflow, validated_data) -> Workflow:
+ if "triggers" in validated_data:
+ triggers = validated_data.pop("triggers")
+
+ if "actions" in validated_data:
+ actions = validated_data.pop("actions")
+
+ instance = super().update(instance, validated_data)
+
+ self.update_triggers_and_actions(instance, triggers, actions)
+
+ self.prune_triggers_and_actions()
+
+ return instance
+
+
+class TrashSerializer(SerializerWithPerms):
+ documents = serializers.ListField(
+ required=False,
+ label="Documents",
+ write_only=True,
+ child=serializers.IntegerField(),
+ )
+
+ action = serializers.ChoiceField(
+ choices=["restore", "empty"],
+ label="Action",
+ write_only=True,
+ )
+
+ def validate_documents(self, documents):
+ count = Document.deleted_objects.filter(id__in=documents).count()
+ if not count == len(documents):
+ raise serializers.ValidationError(
+ "Some documents in the list have not yet been deleted.",
+ )
+ return documents
+
+
+class StoragePathTestSerializer(SerializerWithPerms):
+ path = serializers.CharField(
+ required=True,
+ label="Path",
+ write_only=True,
+ )
+
+ document = serializers.PrimaryKeyRelatedField(
+ queryset=Document.objects.all(),
+ required=True,
+ label="Document",
+ write_only=True,
+ )
from documents.permissions import has_perms_owner_aware
from documents.permissions import set_permissions_for_object
from documents.schema import generate_object_with_permissions_schema
-from documents.serialisers import AcknowledgeTasksViewSerializer
-from documents.serialisers import BulkDownloadSerializer
-from documents.serialisers import BulkEditObjectsSerializer
-from documents.serialisers import BulkEditSerializer
-from documents.serialisers import CorrespondentSerializer
-from documents.serialisers import CustomFieldSerializer
-from documents.serialisers import DocumentListSerializer
-from documents.serialisers import DocumentSerializer
-from documents.serialisers import DocumentTypeSerializer
-from documents.serialisers import PostDocumentSerializer
-from documents.serialisers import RunTaskViewSerializer
-from documents.serialisers import SavedViewSerializer
-from documents.serialisers import SearchResultSerializer
-from documents.serialisers import ShareLinkSerializer
-from documents.serialisers import StoragePathSerializer
-from documents.serialisers import StoragePathTestSerializer
-from documents.serialisers import TagSerializer
-from documents.serialisers import TagSerializerVersion1
-from documents.serialisers import TasksViewSerializer
-from documents.serialisers import TrashSerializer
-from documents.serialisers import UiSettingsViewSerializer
-from documents.serialisers import WorkflowActionSerializer
-from documents.serialisers import WorkflowSerializer
-from documents.serialisers import WorkflowTriggerSerializer
from documents.signals import document_updated
from documents.tasks import consume_file
from documents.tasks import empty_trash
from paperless.models import Workflow
from paperless.models import WorkflowAction
from paperless.models import WorkflowTrigger
+from paperless.serialisers import AcknowledgeTasksViewSerializer
from paperless.serialisers import ApplicationConfigurationSerializer
+from paperless.serialisers import BulkDownloadSerializer
+from paperless.serialisers import BulkEditObjectsSerializer
+from paperless.serialisers import BulkEditSerializer
+from paperless.serialisers import CorrespondentSerializer
+from paperless.serialisers import CustomFieldSerializer
+from paperless.serialisers import DocumentListSerializer
+from paperless.serialisers import DocumentSerializer
+from paperless.serialisers import DocumentTypeSerializer
from paperless.serialisers import GroupSerializer
from paperless.serialisers import PaperlessAuthTokenSerializer
+from paperless.serialisers import PostDocumentSerializer
from paperless.serialisers import ProfileSerializer
+from paperless.serialisers import RunTaskViewSerializer
+from paperless.serialisers import SavedViewSerializer
+from paperless.serialisers import SearchResultSerializer
+from paperless.serialisers import ShareLinkSerializer
+from paperless.serialisers import StoragePathSerializer
+from paperless.serialisers import StoragePathTestSerializer
+from paperless.serialisers import TagSerializer
+from paperless.serialisers import TagSerializerVersion1
+from paperless.serialisers import TasksViewSerializer
+from paperless.serialisers import TrashSerializer
+from paperless.serialisers import UiSettingsViewSerializer
from paperless.serialisers import UserSerializer
+from paperless.serialisers import WorkflowActionSerializer
+from paperless.serialisers import WorkflowSerializer
+from paperless.serialisers import WorkflowTriggerSerializer
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.oauth import PaperlessMailOAuth2Manager
from rest_framework import serializers
-from documents.serialisers import CorrespondentField
-from documents.serialisers import DocumentTypeField
-from documents.serialisers import OwnedObjectSerializer
-from documents.serialisers import TagsField
+from paperless.serialisers import CorrespondentField
+from paperless.serialisers import DocumentTypeField
+from paperless.serialisers import ObfuscatedPasswordField
+from paperless.serialisers import OwnedObjectSerializer
+from paperless.serialisers import TagsField
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
-class ObfuscatedPasswordField(serializers.CharField):
- """
- Sends *** string instead of password in the clear
- """
-
- def to_representation(self, value) -> str:
- return "*" * max(10, len(value))
-
- def to_internal_value(self, data):
- return data
-
-
class MailAccountSerializer(OwnedObjectSerializer):
password = ObfuscatedPasswordField()