]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Respect superuser for advanced queries, test coverage for object perms 3222/head
authorshamoon <4887959+shamoon@users.noreply.github.com>
Thu, 27 Apr 2023 22:00:03 +0000 (15:00 -0700)
committershamoon <4887959+shamoon@users.noreply.github.com>
Thu, 27 Apr 2023 22:51:34 +0000 (15:51 -0700)
src/documents/index.py
src/documents/tests/test_api.py
src/documents/views.py

index 973c99f4d046a06fed747986de3a3f53ab0336bb..6aef2c047920df2dff532c3aaa24e2f10a9600dd 100644 (file)
@@ -225,15 +225,19 @@ class DelayedQuery:
 
         user_criterias = [query.Term("has_owner", False)]
         if "user" in self.query_params:
-            user_criterias.append(query.Term("owner_id", self.query_params["user"]))
-            user_criterias.append(
-                query.Term("viewer_id", str(self.query_params["user"])),
-            )
+            if self.query_params["is_superuser"]:  # superusers see all docs
+                user_criterias = []
+            else:
+                user_criterias.append(query.Term("owner_id", self.query_params["user"]))
+                user_criterias.append(
+                    query.Term("viewer_id", str(self.query_params["user"])),
+                )
         if len(criterias) > 0:
-            criterias.append(query.Or(user_criterias))
+            if len(user_criterias) > 0:
+                criterias.append(query.Or(user_criterias))
             return query.And(criterias)
         else:
-            return query.Or(user_criterias)
+            return query.Or(user_criterias) if len(user_criterias) > 0 else None
 
     def _get_query_sortedby(self):
         if "ordering" not in self.query_params:
index d6158cd7db9becb482a44870eb54faba623d2e9a..b9989ee868bb53d9c5f9efff942fe5532aac992a 100644 (file)
@@ -27,6 +27,7 @@ from django.contrib.auth.models import Permission
 from django.contrib.auth.models import User
 from django.test import override_settings
 from django.utils import timezone
+from guardian.shortcuts import assign_perm
 from rest_framework import status
 from rest_framework.test import APITestCase
 from whoosh.writing import AsyncWriter
@@ -253,8 +254,6 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
         response = self.client.get(f"/api/documents/{doc.pk}/thumb/")
         self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
 
-        from guardian.shortcuts import assign_perm
-
         assign_perm("view_document", user2, doc)
 
         response = self.client.get(f"/api/documents/{doc.pk}/download/")
@@ -1064,6 +1063,92 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
             ),
         )
 
+    def test_search_filtering_respect_owner(self):
+        """
+        GIVEN:
+            - Documents with owners set & without
+        WHEN:
+            - API reuqest for advanced query (search) is made by non-superuser
+            - API reuqest for advanced query (search) is made by superuser
+        THEN:
+            - Only owned docs are returned for regular users
+            - All docs are returned for superuser
+        """
+        superuser = User.objects.create_superuser("superuser")
+        u1 = User.objects.create_user("user1")
+        u2 = User.objects.create_user("user2")
+        u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+        u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+
+        Document.objects.create(checksum="1", content="test 1", owner=u1)
+        Document.objects.create(checksum="2", content="test 2", owner=u2)
+        Document.objects.create(checksum="3", content="test 3", owner=u2)
+        Document.objects.create(checksum="4", content="test 4")
+
+        with AsyncWriter(index.open_index()) as writer:
+            for doc in Document.objects.all():
+                index.update_document(writer, doc)
+
+        self.client.force_authenticate(user=u1)
+        r = self.client.get("/api/documents/?query=test")
+        self.assertEqual(r.data["count"], 2)
+        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+        self.assertEqual(r.data["count"], 2)
+
+        self.client.force_authenticate(user=u2)
+        r = self.client.get("/api/documents/?query=test")
+        self.assertEqual(r.data["count"], 3)
+        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+        self.assertEqual(r.data["count"], 3)
+
+        self.client.force_authenticate(user=superuser)
+        r = self.client.get("/api/documents/?query=test")
+        self.assertEqual(r.data["count"], 4)
+        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+        self.assertEqual(r.data["count"], 4)
+
+    def test_search_filtering_with_object_perms(self):
+        """
+        GIVEN:
+            - Documents with granted view permissions to others
+        WHEN:
+            - API reuqest for advanced query (search) is made by user
+        THEN:
+            - Only docs with granted view permissions are returned
+        """
+        u1 = User.objects.create_user("user1")
+        u2 = User.objects.create_user("user2")
+        u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+        u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+
+        Document.objects.create(checksum="1", content="test 1", owner=u1)
+        d2 = Document.objects.create(checksum="2", content="test 2", owner=u2)
+        d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
+        Document.objects.create(checksum="4", content="test 4")
+
+        with AsyncWriter(index.open_index()) as writer:
+            for doc in Document.objects.all():
+                index.update_document(writer, doc)
+
+        self.client.force_authenticate(user=u1)
+        r = self.client.get("/api/documents/?query=test")
+        self.assertEqual(r.data["count"], 2)
+        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+        self.assertEqual(r.data["count"], 2)
+
+        assign_perm("view_document", u1, d2)
+        assign_perm("view_document", u1, d3)
+
+        with AsyncWriter(index.open_index()) as writer:
+            for doc in [d2, d3]:
+                index.update_document(writer, doc)
+
+        self.client.force_authenticate(user=u1)
+        r = self.client.get("/api/documents/?query=test")
+        self.assertEqual(r.data["count"], 4)
+        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+        self.assertEqual(r.data["count"], 4)
+
     def test_search_sorting(self):
         c1 = Correspondent.objects.create(name="corres Ax")
         c2 = Correspondent.objects.create(name="corres Cx")
index 234c4dda19d517323785fb7214a3848a022cd826..0b450c3b30c354b8532468c5632964e363deb6b1 100644 (file)
@@ -604,6 +604,9 @@ class UnifiedSearchViewSet(DocumentViewSet):
                 # pass user to query for perms
                 self.request.query_params._mutable = True
                 self.request.query_params["user"] = self.request.user.id
+                self.request.query_params[
+                    "is_superuser"
+                ] = self.request.user.is_superuser
                 self.request.query_params._mutable = False
 
             if "query" in self.request.query_params: