from rest_framework import status
from rest_framework.test import APITestCase
-from documents import bulk_edit
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.tests.utils import DirectoriesMixin
-class TestBulkEdit(DirectoriesMixin, APITestCase):
+class TestBulkEditAPI(DirectoriesMixin, APITestCase):
def setUp(self):
super().setUp()
self.doc4.tags.add(self.t1, self.t2)
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
- def test_set_correspondent(self):
- self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
- bulk_edit.set_correspondent(
- [self.doc1.id, self.doc2.id, self.doc3.id],
- self.c2.id,
- )
- self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
-
- def test_unset_correspondent(self):
- self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
- bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None)
- self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
-
- def test_set_document_type(self):
- self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
- bulk_edit.set_document_type(
- [self.doc1.id, self.doc2.id, self.doc3.id],
- self.dt2.id,
- )
- self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
-
- def test_unset_document_type(self):
- self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
- bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None)
- self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
-
- def test_set_document_storage_path(self):
- """
- GIVEN:
- - 5 documents without defined storage path
- WHEN:
- - Bulk edit called to add storage path to 1 document
- THEN:
- - Single document storage path update
- """
- self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
-
- bulk_edit.set_storage_path(
- [self.doc1.id],
- self.sp1.id,
- )
-
- self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
-
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
-
- self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
-
- def test_unset_document_storage_path(self):
- """
- GIVEN:
- - 4 documents without defined storage path
- - 1 document with a defined storage
- WHEN:
- - Bulk edit called to remove storage path from 1 document
- THEN:
- - Single document storage path removed
- """
- self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
-
- bulk_edit.set_storage_path(
- [self.doc1.id],
- self.sp1.id,
- )
-
- self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
-
- bulk_edit.set_storage_path(
- [self.doc1.id],
- None,
- )
-
- self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
-
- self.async_task.assert_called()
- args, kwargs = self.async_task.call_args
-
- self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
-
- def test_add_tag(self):
- self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
- bulk_edit.add_tag(
- [self.doc1.id, self.doc2.id, self.doc3.id, self.doc4.id],
- self.t1.id,
- )
- self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc3.id])
-
- def test_remove_tag(self):
- self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
- bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id)
- self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1)
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- self.assertCountEqual(kwargs["document_ids"], [self.doc4.id])
-
- def test_modify_tags(self):
- tag_unrelated = Tag.objects.create(name="unrelated")
- self.doc2.tags.add(tag_unrelated)
- self.doc3.tags.add(tag_unrelated)
- bulk_edit.modify_tags(
- [self.doc2.id, self.doc3.id],
- add_tags=[self.t2.id],
- remove_tags=[self.t1.id],
- )
-
- self.assertCountEqual(list(self.doc2.tags.all()), [self.t2, tag_unrelated])
- self.assertCountEqual(list(self.doc3.tags.all()), [self.t2, tag_unrelated])
-
- self.async_task.assert_called_once()
- args, kwargs = self.async_task.call_args
- # TODO: doc3 should not be affected, but the query for that is rather complicated
- self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
-
- def test_delete(self):
- self.assertEqual(Document.objects.count(), 5)
- bulk_edit.delete([self.doc1.id, self.doc2.id])
- self.assertEqual(Document.objects.count(), 3)
- self.assertCountEqual(
- [doc.id for doc in Document.objects.all()],
- [self.doc3.id, self.doc4.id, self.doc5.id],
- )
-
@mock.patch("documents.serialisers.bulk_edit.set_correspondent")
def test_api_set_correspondent(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
- {"documents": [self.doc1.id], "method": "delete", "parameters": {}},
+ {"documents": [self.doc1.id], "method": "delete"},
),
content_type="application/json",
)
self.assertEqual(Document.objects.count(), 5)
response = self.client.post(
"/api/documents/bulk_edit/",
- json.dumps({"documents": [-235], "method": "delete", "parameters": {}}),
+ json.dumps({"documents": [-235], "method": "delete"}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
- {"documents": [self.doc2.id], "method": "add_tag", "parameters": {}},
+ {"documents": [self.doc2.id], "method": "add_tag"},
),
content_type="application/json",
)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
- {"documents": [self.doc2.id], "method": "remove_tag", "parameters": {}},
+ {"documents": [self.doc2.id], "method": "remove_tag"},
),
content_type="application/json",
)
from guardian.shortcuts import get_groups_with_perms
from guardian.shortcuts import get_users_with_perms
-from documents.bulk_edit import set_permissions
+from documents import bulk_edit
+from documents.models import Correspondent
from documents.models import Document
+from documents.models import DocumentType
+from documents.models import StoragePath
+from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
-class TestBulkEditPermissions(DirectoriesMixin, TestCase):
+class TestBulkEdit(DirectoriesMixin, TestCase):
def setUp(self):
super().setUp()
- self.doc1 = Document.objects.create(checksum="A", title="A")
- self.doc2 = Document.objects.create(checksum="B", title="B")
- self.doc3 = Document.objects.create(checksum="C", title="C")
-
self.owner = User.objects.create(username="test_owner")
self.user1 = User.objects.create(username="user1")
self.user2 = User.objects.create(username="user2")
self.group1 = Group.objects.create(name="group1")
self.group2 = Group.objects.create(name="group2")
+ patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ self.async_task = patcher.start()
+ self.addCleanup(patcher.stop)
+ self.c1 = Correspondent.objects.create(name="c1")
+ self.c2 = Correspondent.objects.create(name="c2")
+ self.dt1 = DocumentType.objects.create(name="dt1")
+ self.dt2 = DocumentType.objects.create(name="dt2")
+ self.t1 = Tag.objects.create(name="t1")
+ self.t2 = Tag.objects.create(name="t2")
+ self.doc1 = Document.objects.create(checksum="A", title="A")
+ self.doc2 = Document.objects.create(
+ checksum="B",
+ title="B",
+ correspondent=self.c1,
+ document_type=self.dt1,
+ )
+ self.doc3 = Document.objects.create(
+ checksum="C",
+ title="C",
+ correspondent=self.c2,
+ document_type=self.dt2,
+ )
+ self.doc4 = Document.objects.create(checksum="D", title="D")
+ self.doc5 = Document.objects.create(checksum="E", title="E")
+ self.doc2.tags.add(self.t1)
+ self.doc3.tags.add(self.t2)
+ self.doc4.tags.add(self.t1, self.t2)
+ self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
+
+ def test_set_correspondent(self):
+ self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
+ bulk_edit.set_correspondent(
+ [self.doc1.id, self.doc2.id, self.doc3.id],
+ self.c2.id,
+ )
+ self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
+
+ def test_unset_correspondent(self):
+ self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
+ bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None)
+ self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
+
+ def test_set_document_type(self):
+ self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
+ bulk_edit.set_document_type(
+ [self.doc1.id, self.doc2.id, self.doc3.id],
+ self.dt2.id,
+ )
+ self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
+
+ def test_unset_document_type(self):
+ self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
+ bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None)
+ self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
+
+ def test_set_document_storage_path(self):
+ """
+ GIVEN:
+ - 5 documents without defined storage path
+ WHEN:
+ - Bulk edit called to add storage path to 1 document
+ THEN:
+ - Single document storage path update
+ """
+ self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
+
+ bulk_edit.set_storage_path(
+ [self.doc1.id],
+ self.sp1.id,
+ )
+
+ self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
+
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+
+ self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
+
+ def test_unset_document_storage_path(self):
+ """
+ GIVEN:
+ - 4 documents without defined storage path
+ - 1 document with a defined storage
+ WHEN:
+ - Bulk edit called to remove storage path from 1 document
+ THEN:
+ - Single document storage path removed
+ """
+ self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
+
+ bulk_edit.set_storage_path(
+ [self.doc1.id],
+ self.sp1.id,
+ )
+
+ self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
+
+ bulk_edit.set_storage_path(
+ [self.doc1.id],
+ None,
+ )
+
+ self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
+
+ self.async_task.assert_called()
+ args, kwargs = self.async_task.call_args
+
+ self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
+
+ def test_add_tag(self):
+ self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
+ bulk_edit.add_tag(
+ [self.doc1.id, self.doc2.id, self.doc3.id, self.doc4.id],
+ self.t1.id,
+ )
+ self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc3.id])
+
+ def test_remove_tag(self):
+ self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
+ bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id)
+ self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1)
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ self.assertCountEqual(kwargs["document_ids"], [self.doc4.id])
+
+ def test_modify_tags(self):
+ tag_unrelated = Tag.objects.create(name="unrelated")
+ self.doc2.tags.add(tag_unrelated)
+ self.doc3.tags.add(tag_unrelated)
+ bulk_edit.modify_tags(
+ [self.doc2.id, self.doc3.id],
+ add_tags=[self.t2.id],
+ remove_tags=[self.t1.id],
+ )
+
+ self.assertCountEqual(list(self.doc2.tags.all()), [self.t2, tag_unrelated])
+ self.assertCountEqual(list(self.doc3.tags.all()), [self.t2, tag_unrelated])
+
+ self.async_task.assert_called_once()
+ args, kwargs = self.async_task.call_args
+ # TODO: doc3 should not be affected, but the query for that is rather complicated
+ self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
+
+ def test_delete(self):
+ self.assertEqual(Document.objects.count(), 5)
+ bulk_edit.delete([self.doc1.id, self.doc2.id])
+ self.assertEqual(Document.objects.count(), 3)
+ self.assertCountEqual(
+ [doc.id for doc in Document.objects.all()],
+ [self.doc3.id, self.doc4.id, self.doc5.id],
+ )
+
@mock.patch("documents.tasks.bulk_update_documents.delay")
def test_set_permissions(self, m):
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
},
}
- set_permissions(
+ bulk_edit.set_permissions(
doc_ids,
set_permissions=permissions,
owner=self.owner,
"groups": [self.group2.id],
},
}
- set_permissions(
+ bulk_edit.set_permissions(
doc_ids,
set_permissions=permissions,
owner=self.owner,