]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Fix: enforce permissions on bulk_edit operations 4007/head
authorshamoon <4887959+shamoon@users.noreply.github.com>
Thu, 17 Aug 2023 06:49:42 +0000 (23:49 -0700)
committershamoon <4887959+shamoon@users.noreply.github.com>
Thu, 17 Aug 2023 07:12:46 +0000 (00:12 -0700)
src/documents/tests/test_api.py
src/documents/views.py

index d788cf6a478f896077fe920515b6c8b5b840a91e..88180d4d8a1475af858c2bfee15911d638b5bbe9 100644 (file)
@@ -3453,6 +3453,110 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
         self.assertCountEqual(args[0], [self.doc2.id, self.doc3.id])
         self.assertEqual(len(kwargs["set_permissions"]["view"]["users"]), 2)
 
+    @mock.patch("documents.serialisers.bulk_edit.set_permissions")
+    def test_insufficient_permissions_ownership(self, m):
+        """
+        GIVEN:
+            - Documents owned by user other than logged in user
+        WHEN:
+            - set_permissions bulk edit API endpoint is called
+        THEN:
+            - User is not able to change permissions
+        """
+        m.return_value = "OK"
+        self.doc1.owner = User.objects.get(username="temp_admin")
+        self.doc1.save()
+        user1 = User.objects.create(username="user1")
+        self.client.force_authenticate(user=user1)
+
+        permissions = {
+            "owner": user1.id,
+        }
+
+        response = self.client.post(
+            "/api/documents/bulk_edit/",
+            json.dumps(
+                {
+                    "documents": [self.doc1.id, self.doc2.id, self.doc3.id],
+                    "method": "set_permissions",
+                    "parameters": {"set_permissions": permissions},
+                },
+            ),
+            content_type="application/json",
+        )
+
+        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
+
+        m.assert_not_called()
+        self.assertEqual(response.content, b"Insufficient permissions")
+
+        response = self.client.post(
+            "/api/documents/bulk_edit/",
+            json.dumps(
+                {
+                    "documents": [self.doc2.id, self.doc3.id],
+                    "method": "set_permissions",
+                    "parameters": {"set_permissions": permissions},
+                },
+            ),
+            content_type="application/json",
+        )
+
+        self.assertEqual(response.status_code, status.HTTP_200_OK)
+        m.assert_called_once()
+
+    @mock.patch("documents.serialisers.bulk_edit.set_storage_path")
+    def test_insufficient_permissions_edit(self, m):
+        """
+        GIVEN:
+            - Documents for which current user only has view permissions
+        WHEN:
+            - API is called
+        THEN:
+            - set_storage_path is only called if user can edit all docs
+        """
+        m.return_value = "OK"
+        self.doc1.owner = User.objects.get(username="temp_admin")
+        self.doc1.save()
+        user1 = User.objects.create(username="user1")
+        assign_perm("view_document", user1, self.doc1)
+        self.client.force_authenticate(user=user1)
+
+        response = self.client.post(
+            "/api/documents/bulk_edit/",
+            json.dumps(
+                {
+                    "documents": [self.doc1.id, self.doc2.id, self.doc3.id],
+                    "method": "set_storage_path",
+                    "parameters": {"storage_path": self.sp1.id},
+                },
+            ),
+            content_type="application/json",
+        )
+
+        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
+
+        m.assert_not_called()
+        self.assertEqual(response.content, b"Insufficient permissions")
+
+        assign_perm("change_document", user1, self.doc1)
+
+        response = self.client.post(
+            "/api/documents/bulk_edit/",
+            json.dumps(
+                {
+                    "documents": [self.doc1.id, self.doc2.id, self.doc3.id],
+                    "method": "set_storage_path",
+                    "parameters": {"storage_path": self.sp1.id},
+                },
+            ),
+            content_type="application/json",
+        )
+
+        self.assertEqual(response.status_code, status.HTTP_200_OK)
+
+        m.assert_called_once()
+
 
 class TestBulkDownload(DirectoriesMixin, APITestCase):
     ENDPOINT = "/api/documents/bulk_download/"
index d57ad4eeae6c32897e78c0378e4a9e4a2956ee22..b04b87243b8c2f6693cc3f81977f479a4d69094e 100644 (file)
@@ -54,6 +54,7 @@ from rest_framework.viewsets import ModelViewSet
 from rest_framework.viewsets import ReadOnlyModelViewSet
 from rest_framework.viewsets import ViewSet
 
+from documents import bulk_edit
 from documents.filters import ObjectOwnedOrGrantedPermissionsFilter
 from documents.permissions import PaperlessAdminPermissions
 from documents.permissions import PaperlessObjectPermissions
@@ -694,7 +695,7 @@ class SavedViewViewSet(ModelViewSet, PassUserMixin):
         serializer.save(owner=self.request.user)
 
 
-class BulkEditView(GenericAPIView):
+class BulkEditView(GenericAPIView, PassUserMixin):
     permission_classes = (IsAuthenticated,)
     serializer_class = BulkEditSerializer
     parser_classes = (parsers.JSONParser,)
@@ -703,10 +704,25 @@ class BulkEditView(GenericAPIView):
         serializer = self.get_serializer(data=request.data)
         serializer.is_valid(raise_exception=True)
 
+        user = self.request.user
         method = serializer.validated_data.get("method")
         parameters = serializer.validated_data.get("parameters")
         documents = serializer.validated_data.get("documents")
 
+        if not user.is_superuser:
+            document_objs = Document.objects.filter(pk__in=documents)
+            has_perms = (
+                all((doc.owner == user or doc.owner is None) for doc in document_objs)
+                if method == bulk_edit.set_permissions
+                else all(
+                    has_perms_owner_aware(user, "change_document", doc)
+                    for doc in document_objs
+                )
+            )
+
+            if not has_perms:
+                return HttpResponseForbidden("Insufficient permissions")
+
         try:
             # TODO: parameter validation
             result = method(documents, **parameters)