shared_object_pks = self.get_shared_object_pks([obj])
return obj.owner == self.user and obj.id in shared_object_pks
- permissions = SerializerMethodField(read_only=True)
- user_can_change = SerializerMethodField(read_only=True)
- is_shared_by_requester = SerializerMethodField(read_only=True)
+ permissions = SerializerMethodField(read_only=True, required=False)
+ user_can_change = SerializerMethodField(read_only=True, required=False)
+ is_shared_by_requester = SerializerMethodField(read_only=True, required=False)
set_permissions = SetPermissionsSerializer(
label="Set permissions",
set_triggers = []
set_actions = []
- if triggers is not None:
+ if triggers is not None and triggers is not serializers.empty:
for trigger in triggers:
filter_has_tags = trigger.pop("filter_has_tags", None)
trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
trigger_instance.filter_has_tags.set(filter_has_tags)
set_triggers.append(trigger_instance)
- if actions is not None:
+ if actions is not None and actions is not serializers.empty:
for action in actions:
assign_tags = action.pop("assign_tags", None)
assign_view_users = action.pop("assign_view_users", None)
set_actions.append(action_instance)
- instance.triggers.set(set_triggers)
- instance.actions.set(set_actions)
+ if triggers is not serializers.empty:
+ instance.triggers.set(set_triggers)
+ if actions is not serializers.empty:
+ instance.actions.set(set_actions)
instance.save()
def prune_triggers_and_actions(self):
"""
ManyToMany fields dont support e.g. on_delete so we need to discard unattached
- triggers and actionas manually
+ triggers and actions manually
"""
for trigger in WorkflowTrigger.objects.all():
if trigger.workflows.all().count() == 0:
return instance
def update(self, instance: Workflow, validated_data) -> Workflow:
- if "triggers" in validated_data:
- triggers = validated_data.pop("triggers")
-
- if "actions" in validated_data:
- actions = validated_data.pop("actions")
+ triggers = validated_data.pop("triggers", serializers.empty)
+ actions = validated_data.pop("actions", serializers.empty)
instance = super().update(instance, validated_data)
self.update_triggers_and_actions(instance, triggers, actions)
-
self.prune_triggers_and_actions()
return instance
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+
+
+class TestFullPermissionsFlag(APITestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.admin = User.objects.create_superuser(username="admin")
+
+ def test_full_perms_flag(self):
+ """
+ GIVEN:
+ - API request to list documents
+ WHEN:
+ - full_perms flag is set to true, 1, false, or a random value
+ THEN:
+ - Permissions field is included or excluded accordingly
+ """
+ self.client.force_authenticate(self.admin)
+ Document.objects.create(title="Doc", checksum="xyz", owner=self.admin)
+
+ resp = self.client.get("/api/documents/?full_perms=true")
+ self.assertIn("permissions", resp.data["results"][0])
+
+ resp = self.client.get("/api/documents/?full_perms=1")
+ self.assertIn("permissions", resp.data["results"][0])
+
+ resp = self.client.get("/api/documents/?full_perms=false")
+ self.assertNotIn("permissions", resp.data["results"][0])
+
+ resp = self.client.get("/api/documents/?full_perms=garbage")
+ self.assertNotIn("permissions", resp.data["results"][0])
self.assertEqual(workflow.triggers.first().filter_has_tags.first(), self.t1)
self.assertEqual(workflow.actions.first().assign_title, "Action New Title")
+ def test_api_update_workflow_no_trigger_actions(self):
+ """
+ GIVEN:
+ - Existing workflow
+ WHEN:
+ - API request to update an existing workflow with no triggers and actions
+ - API request to update an existing workflow with empty actions and no triggers
+ THEN:
+ - No changes are made to the workflow
+ - Actions are removed, but triggers are not
+ """
+ response = self.client.patch(
+ f"{self.ENDPOINT}{self.workflow.id}/",
+ json.dumps(
+ {
+ "name": "Workflow Updated",
+ "order": 1,
+ },
+ ),
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ workflow = Workflow.objects.get(id=self.workflow.id)
+ self.assertEqual(workflow.name, "Workflow Updated")
+ self.assertEqual(workflow.triggers.count(), 1)
+ self.assertEqual(workflow.actions.count(), 1)
+
+ response = self.client.patch(
+ f"{self.ENDPOINT}{self.workflow.id}/",
+ json.dumps(
+ {
+ "name": "Workflow Updated 2",
+ "order": 1,
+ "actions": [],
+ },
+ ),
+ content_type="application/json",
+ )
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ workflow = Workflow.objects.get(id=self.workflow.id)
+ self.assertEqual(workflow.name, "Workflow Updated 2")
+ self.assertEqual(workflow.triggers.count(), 1)
+ self.assertEqual(workflow.actions.count(), 0)
+
def test_api_auto_remove_orphaned_triggers_actions(self):
"""
GIVEN:
content_type="application/json",
)
self.assertEqual(response.status_code, expected_resp_code)
+
+ def test_patch_trigger_cannot_change_id(self):
+ """
+ GIVEN:
+ - An existing workflow trigger
+ - An existing workflow action
+ WHEN:
+ - PATCHing the trigger with a different 'id' in the body
+ - PATCHing the action with a different 'id' in the body
+ THEN:
+ - HTTP 400 error is returned
+ """
+ response = self.client.patch(
+ f"/api/workflow_triggers/{self.trigger.id}/",
+ {
+ "id": self.trigger.id + 1,
+ "filter_filename": "patched.pdf",
+ },
+ format="json",
+ )
+
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.trigger.refresh_from_db()
+ self.assertNotEqual(self.trigger.filter_filename, "patched.pdf")
+
+ response = self.client.patch(
+ f"/api/workflow_triggers/{self.trigger.id}/",
+ {
+ "id": self.trigger.id,
+ "filter_filename": "patched.pdf",
+ },
+ format="json",
+ )
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.trigger.refresh_from_db()
+ self.assertEqual(self.trigger.filter_filename, "patched.pdf")
+
+ response = self.client.patch(
+ f"/api/workflow_actions/{self.action.id}/",
+ {
+ "id": self.action.id + 1,
+ "assign_title": "Patched Title",
+ },
+ format="json",
+ )
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.action.refresh_from_db()
+ self.assertNotEqual(self.action.assign_title, "Patched Title")
+
+ response = self.client.patch(
+ f"/api/workflow_actions/{self.action.id}/",
+ {
+ "id": self.action.id,
+ "assign_title": "Patched Title",
+ },
+ format="json",
+ )
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.action.refresh_from_db()
+ self.assertEqual(self.action.assign_title, "Patched Title")
import tempfile
import zipfile
from datetime import datetime
+from distutils.util import strtobool
from pathlib import Path
from time import mktime
from unicodedata import normalize
def get_serializer(self, *args, **kwargs):
kwargs.setdefault("user", self.request.user)
+ try:
+ full_perms = bool(
+ strtobool(str(self.request.query_params.get("full_perms", "false"))),
+ )
+ except ValueError:
+ full_perms = False
kwargs.setdefault(
"full_perms",
- self.request.query_params.get("full_perms", False),
+ full_perms,
)
return super().get_serializer(*args, **kwargs)
kwargs.setdefault("context", self.get_serializer_context())
kwargs.setdefault("fields", fields)
kwargs.setdefault("truncate_content", truncate_content.lower() in ["true", "1"])
+ try:
+ full_perms = bool(
+ strtobool(str(self.request.query_params.get("full_perms", "false"))),
+ )
+ except ValueError:
+ full_perms = False
kwargs.setdefault(
"full_perms",
- self.request.query_params.get("full_perms", False),
+ full_perms,
)
return super().get_serializer(*args, **kwargs)
queryset = WorkflowTrigger.objects.all()
+ def partial_update(self, request, *args, **kwargs):
+ if "id" in request.data and str(request.data["id"]) != str(kwargs["pk"]):
+ return HttpResponseBadRequest(
+ "ID in body does not match URL",
+ )
+ return super().partial_update(request, *args, **kwargs)
+
class WorkflowActionViewSet(ModelViewSet):
permission_classes = (IsAuthenticated, PaperlessObjectPermissions)
"assign_custom_fields",
)
+ def partial_update(self, request, *args, **kwargs):
+ if "id" in request.data and str(request.data["id"]) != str(kwargs["pk"]):
+ return HttpResponseBadRequest(
+ "ID in body does not match URL",
+ )
+ return super().partial_update(request, *args, **kwargs)
+
class WorkflowViewSet(ModelViewSet):
permission_classes = (IsAuthenticated, PaperlessObjectPermissions)
)
return super().update(request, *args, **kwargs)
+ @extend_schema(
+ request=None,
+ responses={
+ 200: OpenApiTypes.BOOL,
+ 404: OpenApiTypes.STR,
+ },
+ )
@action(detail=True, methods=["post"])
def deactivate_totp(self, request, pk=None):
request_user = request.user