return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
-def set_permissions_for_object(permissions, object):
+def set_permissions_for_object(permissions: list[str], object, merge: bool = False):
+ """
+ Set permissions for an object. The permissions are given as a list of strings
+ in the format "action_modelname", e.g. "view_document".
+
+ If merge is True, the permissions are merged with the existing permissions and
+ no users or groups are removed. If False, the permissions are set to exactly
+ the given list of users and groups.
+ """
+
for action in permissions:
permission = f"{action}_{object.__class__.__name__.lower()}"
# users
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
- users_to_remove = get_users_with_perms(
- object,
- only_with_perms_in=[permission],
- with_group_users=False,
+ users_to_remove = (
+ get_users_with_perms(
+ object,
+ only_with_perms_in=[permission],
+ with_group_users=False,
+ )
+ if not merge
+ else User.objects.none()
)
if len(users_to_add) > 0 and len(users_to_remove) > 0:
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
)
# groups
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
- groups_to_remove = get_groups_with_only_permission(
- object,
- permission,
+ groups_to_remove = (
+ get_groups_with_only_permission(
+ object,
+ permission,
+ )
+ if not merge
+ else Group.objects.none()
)
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
)
if (
- action.assign_view_users is not None
- or action.assign_view_groups is not None
- or action.assign_change_users is not None
- or action.assign_change_groups is not None
+ (
+ action.assign_view_users is not None
+ and action.assign_view_users.count() > 0
+ )
+ or (
+ action.assign_view_groups is not None
+ and action.assign_view_groups.count() > 0
+ )
+ or (
+ action.assign_change_users is not None
+ and action.assign_change_users.count() > 0
+ )
+ or (
+ action.assign_change_groups is not None
+ and action.assign_change_groups.count() > 0
+ )
):
permissions = {
"view": {
or [],
},
}
- set_permissions_for_object(permissions=permissions, object=document)
+ set_permissions_for_object(
+ permissions=permissions,
+ object=document,
+ merge=True,
+ )
if action.assign_custom_fields is not None:
for field in action.assign_custom_fields.all():
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
+from django.db.models import QuerySet
from django.utils import timezone
+from guardian.shortcuts import assign_perm
+from guardian.shortcuts import get_groups_with_perms
+from guardian.shortcuts import get_users_with_perms
from rest_framework.test import APITestCase
from documents import tasks
self.user2 = User.objects.create(username="user2")
self.user3 = User.objects.create(username="user3")
self.group1 = Group.objects.create(name="group1")
+ self.group2 = Group.objects.create(name="group2")
account1 = MailAccount.objects.create(
name="Email1",
format="json",
)
- self.assertEqual(doc.custom_fields.all().count(), 1)
+ def test_document_updated_workflow_merge_permissions(self):
+ """
+ GIVEN:
+ - Existing workflow with UPDATED trigger and action that sets permissions
+ WHEN:
+ - Document is updated that already has permissions
+ THEN:
+ - Permissions are merged
+ """
+ trigger = WorkflowTrigger.objects.create(
+ type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
+ filter_has_document_type=self.dt,
+ )
+ action = WorkflowAction.objects.create()
+ action.assign_view_users.add(self.user3)
+ action.assign_change_users.add(self.user3)
+ action.assign_view_groups.add(self.group2)
+ action.save()
+
+ w = Workflow.objects.create(
+ name="Workflow 1",
+ order=0,
+ )
+ w.triggers.add(trigger)
+ w.actions.add(action)
+ w.save()
+
+ doc = Document.objects.create(
+ title="sample test",
+ correspondent=self.c,
+ original_filename="sample.pdf",
+ )
+
+ assign_perm("documents.view_document", self.user2, doc)
+ assign_perm("documents.change_document", self.user2, doc)
+ assign_perm("documents.view_document", self.group1, doc)
+ assign_perm("documents.change_document", self.group1, doc)
+
+ superuser = User.objects.create_superuser("superuser")
+ self.client.force_authenticate(user=superuser)
+
+ self.client.patch(
+ f"/api/documents/{doc.id}/",
+ {"document_type": self.dt.id},
+ format="json",
+ )
+
+ view_users_perms: QuerySet = get_users_with_perms(
+ doc,
+ only_with_perms_in=["view_document"],
+ )
+ change_users_perms: QuerySet = get_users_with_perms(
+ doc,
+ only_with_perms_in=["change_document"],
+ )
+ # user2 should still have permissions
+ self.assertIn(self.user2, view_users_perms)
+ self.assertIn(self.user2, change_users_perms)
+ # user3 should have been added
+ self.assertIn(self.user3, view_users_perms)
+ self.assertIn(self.user3, change_users_perms)
+
+ group_perms: QuerySet = get_groups_with_perms(doc)
+ # group1 should still have permissions
+ self.assertIn(self.group1, group_perms)
+ # group2 should have been added
+ self.assertIn(self.group2, group_perms)
def test_workflow_enabled_disabled(self):
trigger = WorkflowTrigger.objects.create(