from unittest.mock import MagicMock
import celery
-import pytest
from dateutil import parser
-from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
from guardian.shortcuts import get_users_with_perms
from rest_framework import status
from rest_framework.test import APITestCase
-from whoosh.writing import AsyncWriter
from documents import bulk_edit
-from documents import index
from documents.data_models import DocumentSource
from documents.models import ConsumptionTemplate
from documents.models import Correspondent
[u1_doc1.id, u1_doc2.id, u2_doc2.id],
)
- def test_search(self):
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- original_filename="someepdf.pdf",
- )
- with AsyncWriter(index.open_index()) as writer:
- # Note to future self: there is a reason we dont use a model signal handler to update the index: some operations edit many documents at once
- # (retagger, renamer) and we don't want to open a writer for each of these, but rather perform the entire operation with one writer.
- # That's why we cant open the writer in a model on_save handler or something.
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
- response = self.client.get("/api/documents/?query=bank")
- results = response.data["results"]
- self.assertEqual(response.data["count"], 3)
- self.assertEqual(len(results), 3)
- self.assertCountEqual(response.data["all"], [d1.id, d2.id, d3.id])
-
- response = self.client.get("/api/documents/?query=september")
- results = response.data["results"]
- self.assertEqual(response.data["count"], 1)
- self.assertEqual(len(results), 1)
- self.assertCountEqual(response.data["all"], [d3.id])
- self.assertEqual(results[0]["original_file_name"], "someepdf.pdf")
-
- response = self.client.get("/api/documents/?query=statement")
- results = response.data["results"]
- self.assertEqual(response.data["count"], 2)
- self.assertEqual(len(results), 2)
- self.assertCountEqual(response.data["all"], [d2.id, d3.id])
-
- response = self.client.get("/api/documents/?query=sfegdfg")
- results = response.data["results"]
- self.assertEqual(response.data["count"], 0)
- self.assertEqual(len(results), 0)
- self.assertCountEqual(response.data["all"], [])
-
- def test_search_multi_page(self):
- with AsyncWriter(index.open_index()) as writer:
- for i in range(55):
- doc = Document.objects.create(
- checksum=str(i),
- pk=i + 1,
- title=f"Document {i+1}",
- content="content",
- )
- index.update_document(writer, doc)
-
- # This is here so that we test that no document gets returned twice (might happen if the paging is not working)
- seen_ids = []
-
- for i in range(1, 6):
- response = self.client.get(
- f"/api/documents/?query=content&page={i}&page_size=10",
- )
- results = response.data["results"]
- self.assertEqual(response.data["count"], 55)
- self.assertEqual(len(results), 10)
-
- for result in results:
- self.assertNotIn(result["id"], seen_ids)
- seen_ids.append(result["id"])
-
- response = self.client.get("/api/documents/?query=content&page=6&page_size=10")
- results = response.data["results"]
- self.assertEqual(response.data["count"], 55)
- self.assertEqual(len(results), 5)
-
- for result in results:
- self.assertNotIn(result["id"], seen_ids)
- seen_ids.append(result["id"])
-
- def test_search_invalid_page(self):
- with AsyncWriter(index.open_index()) as writer:
- for i in range(15):
- doc = Document.objects.create(
- checksum=str(i),
- pk=i + 1,
- title=f"Document {i+1}",
- content="content",
- )
- index.update_document(writer, doc)
-
- response = self.client.get("/api/documents/?query=content&page=0&page_size=10")
- self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
- response = self.client.get("/api/documents/?query=content&page=3&page_size=10")
- self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
-
- @override_settings(
- TIME_ZONE="UTC",
- )
- def test_search_added_in_last_week(self):
- """
- GIVEN:
- - Three documents added right now
- - The timezone is UTC time
- WHEN:
- - Query for documents added in the last 7 days
- THEN:
- - All three recent documents are returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- )
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:[-1 week to now]")
- results = response.data["results"]
- # Expect 3 documents returned
- self.assertEqual(len(results), 3)
-
- for idx, subset in enumerate(
- [
- {"id": 1, "title": "invoice"},
- {"id": 2, "title": "bank statement 1"},
- {"id": 3, "title": "bank statement 3"},
- ],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- @override_settings(
- TIME_ZONE="America/Chicago",
- )
- def test_search_added_in_last_week_with_timezone_behind(self):
- """
- GIVEN:
- - Two documents added right now
- - One document added over a week ago
- - The timezone is behind UTC time (-6)
- WHEN:
- - Query for documents added in the last 7 days
- THEN:
- - The two recent documents are returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- # 7 days, 1 hour and 1 minute ago
- added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
- )
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:[-1 week to now]")
- results = response.data["results"]
-
- # Expect 2 documents returned
- self.assertEqual(len(results), 2)
-
- for idx, subset in enumerate(
- [{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- @override_settings(
- TIME_ZONE="Europe/Sofia",
- )
- def test_search_added_in_last_week_with_timezone_ahead(self):
- """
- GIVEN:
- - Two documents added right now
- - One document added over a week ago
- - The timezone is behind UTC time (+2)
- WHEN:
- - Query for documents added in the last 7 days
- THEN:
- - The two recent documents are returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- # 7 days, 1 hour and 1 minute ago
- added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
- )
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:[-1 week to now]")
- results = response.data["results"]
-
- # Expect 2 documents returned
- self.assertEqual(len(results), 2)
-
- for idx, subset in enumerate(
- [{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- @override_settings(
- TIME_ZONE="Europe/Sofia",
- )
- def test_search_added_specific_date_with_timezone_ahead(self):
- """
- GIVEN:
- - Two documents added right now
- - One document added on a specific date
- - The timezone is behind UTC time (+2)
- WHEN:
- - Query for documents added on a specific date
- THEN:
- - The one document is returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- # specific time zone aware date
- added=timezone.make_aware(datetime.datetime(2023, 12, 1)),
- )
- # refresh doc instance to ensure we operate on date objects that Django uses
- # Django converts dates to UTC
- d3.refresh_from_db()
-
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:20231201")
- results = response.data["results"]
-
- # Expect 1 document returned
- self.assertEqual(len(results), 1)
-
- for idx, subset in enumerate(
- [{"id": 3, "title": "bank statement 3"}],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- def test_search_added_in_last_month(self):
- """
- GIVEN:
- - One document added right now
- - One documents added about a week ago
- - One document added over 1 month
- WHEN:
- - Query for documents added in the last month
- THEN:
- - The two recent documents are returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- # 1 month, 1 day ago
- added=timezone.now() - relativedelta(months=1, days=1),
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- # 7 days, 1 hour and 1 minute ago
- added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
- )
-
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:[-1 month to now]")
- results = response.data["results"]
-
- # Expect 2 documents returned
- self.assertEqual(len(results), 2)
-
- for idx, subset in enumerate(
- [{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- @override_settings(
- TIME_ZONE="America/Denver",
- )
- def test_search_added_in_last_month_timezone_behind(self):
- """
- GIVEN:
- - One document added right now
- - One documents added about a week ago
- - One document added over 1 month
- - The timezone is behind UTC time (-6 or -7)
- WHEN:
- - Query for documents added in the last month
- THEN:
- - The two recent documents are returned
- """
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- # 1 month, 1 day ago
- added=timezone.now() - relativedelta(months=1, days=1),
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- # 7 days, 1 hour and 1 minute ago
- added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
- )
-
- with index.open_index_writer() as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/documents/?query=added:[-1 month to now]")
- results = response.data["results"]
-
- # Expect 2 documents returned
- self.assertEqual(len(results), 2)
-
- for idx, subset in enumerate(
- [{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
- ):
- result = results[idx]
- # Assert subset in results
- self.assertDictEqual(result, {**result, **subset})
-
- @mock.patch("documents.index.autocomplete")
- def test_search_autocomplete(self, m):
- m.side_effect = lambda ix, term, limit, user: [term for _ in range(limit)]
-
- response = self.client.get("/api/search/autocomplete/?term=test")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(len(response.data), 10)
-
- response = self.client.get("/api/search/autocomplete/?term=test&limit=20")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(len(response.data), 20)
-
- response = self.client.get("/api/search/autocomplete/?term=test&limit=-1")
- self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
-
- response = self.client.get("/api/search/autocomplete/")
- self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
-
- response = self.client.get("/api/search/autocomplete/?term=")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(len(response.data), 10)
-
- def test_search_autocomplete_respect_permissions(self):
- """
- GIVEN:
- - Multiple users and documents with & without permissions
- WHEN:
- - API reuqest for autocomplete is made by user with or without permissions
- THEN:
- - Terms only within docs user has access to are returned
- """
- u1 = User.objects.create_user("user1")
- u2 = User.objects.create_user("user2")
-
- self.client.force_authenticate(user=u1)
-
- d1 = Document.objects.create(
- title="doc1",
- content="apples",
- checksum="1",
- owner=u1,
- )
- d2 = Document.objects.create(
- title="doc2",
- content="applebaum",
- checksum="2",
- owner=u1,
- )
- d3 = Document.objects.create(
- title="doc3",
- content="appletini",
- checksum="3",
- owner=u1,
- )
-
- with AsyncWriter(index.open_index()) as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get("/api/search/autocomplete/?term=app")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
-
- d3.owner = u2
-
- with AsyncWriter(index.open_index()) as writer:
- index.update_document(writer, d3)
-
- response = self.client.get("/api/search/autocomplete/?term=app")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(response.data, [b"apples", b"applebaum"])
-
- assign_perm("view_document", u1, d3)
-
- with AsyncWriter(index.open_index()) as writer:
- index.update_document(writer, d3)
-
- response = self.client.get("/api/search/autocomplete/?term=app")
- self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
-
- @pytest.mark.skip(reason="Not implemented yet")
- def test_search_spelling_correction(self):
- with AsyncWriter(index.open_index()) as writer:
- for i in range(55):
- doc = Document.objects.create(
- checksum=str(i),
- pk=i + 1,
- title=f"Document {i+1}",
- content=f"Things document {i+1}",
- )
- index.update_document(writer, doc)
-
- response = self.client.get("/api/search/?query=thing")
- correction = response.data["corrected_query"]
-
- self.assertEqual(correction, "things")
-
- response = self.client.get("/api/search/?query=things")
- correction = response.data["corrected_query"]
-
- self.assertEqual(correction, None)
-
- def test_search_more_like(self):
- d1 = Document.objects.create(
- title="invoice",
- content="the thing i bought at a shop and paid with bank account",
- checksum="A",
- pk=1,
- )
- d2 = Document.objects.create(
- title="bank statement 1",
- content="things i paid for in august",
- pk=2,
- checksum="B",
- )
- d3 = Document.objects.create(
- title="bank statement 3",
- content="things i paid for in september",
- pk=3,
- checksum="C",
- )
- with AsyncWriter(index.open_index()) as writer:
- index.update_document(writer, d1)
- index.update_document(writer, d2)
- index.update_document(writer, d3)
-
- response = self.client.get(f"/api/documents/?more_like_id={d2.id}")
-
- self.assertEqual(response.status_code, status.HTTP_200_OK)
-
- results = response.data["results"]
-
- self.assertEqual(len(results), 2)
- self.assertEqual(results[0]["id"], d3.id)
- self.assertEqual(results[1]["id"], d1.id)
-
- def test_search_filtering(self):
- t = Tag.objects.create(name="tag")
- t2 = Tag.objects.create(name="tag2")
- c = Correspondent.objects.create(name="correspondent")
- c2 = Correspondent.objects.create(name="correspondent2")
- dt = DocumentType.objects.create(name="type")
- dt2 = DocumentType.objects.create(name="type2")
- sp = StoragePath.objects.create(name="path")
- sp2 = StoragePath.objects.create(name="path2")
- cf1 = CustomField.objects.create(
- name="string field",
- data_type=CustomField.FieldDataType.STRING,
- )
- cf2 = CustomField.objects.create(
- name="number field",
- data_type=CustomField.FieldDataType.INT,
- )
-
- d1 = Document.objects.create(checksum="1", correspondent=c, content="test")
- d2 = Document.objects.create(checksum="2", document_type=dt, content="test")
- d3 = Document.objects.create(checksum="3", content="test")
-
- d3.tags.add(t)
- d3.tags.add(t2)
- d4 = Document.objects.create(
- checksum="4",
- created=timezone.make_aware(datetime.datetime(2020, 7, 13)),
- content="test",
- original_filename="doc4.pdf",
- )
- d4.tags.add(t2)
- d5 = Document.objects.create(
- checksum="5",
- added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
- content="test",
- original_filename="doc5.pdf",
- )
- Document.objects.create(checksum="6", content="test2")
- d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
- d8 = Document.objects.create(
- checksum="foo",
- correspondent=c2,
- document_type=dt2,
- storage_path=sp2,
- content="test",
- )
-
- cf1_d1 = CustomFieldInstance.objects.create(
- document=d1,
- field=cf1,
- value_text="foobard1",
- )
- cf2_d1 = CustomFieldInstance.objects.create(
- document=d1,
- field=cf2,
- value_int=999,
- )
- cf1_d4 = CustomFieldInstance.objects.create(
- document=d4,
- field=cf1,
- value_text="foobard4",
- )
-
- with AsyncWriter(index.open_index()) as writer:
- for doc in Document.objects.all():
- index.update_document(writer, doc)
-
- def search_query(q):
- r = self.client.get("/api/documents/?query=test" + q)
- self.assertEqual(r.status_code, status.HTTP_200_OK)
- return [hit["id"] for hit in r.data["results"]]
-
- self.assertCountEqual(
- search_query(""),
- [d1.id, d2.id, d3.id, d4.id, d5.id, d7.id, d8.id],
- )
- self.assertCountEqual(search_query("&is_tagged=true"), [d3.id, d4.id])
- self.assertCountEqual(
- search_query("&is_tagged=false"),
- [d1.id, d2.id, d5.id, d7.id, d8.id],
- )
- self.assertCountEqual(search_query("&correspondent__id=" + str(c.id)), [d1.id])
- self.assertCountEqual(
- search_query(f"&correspondent__id__in={c.id},{c2.id}"),
- [d1.id, d8.id],
- )
- self.assertCountEqual(
- search_query("&correspondent__id__none=" + str(c.id)),
- [d2.id, d3.id, d4.id, d5.id, d7.id, d8.id],
- )
- self.assertCountEqual(search_query("&document_type__id=" + str(dt.id)), [d2.id])
- self.assertCountEqual(
- search_query(f"&document_type__id__in={dt.id},{dt2.id}"),
- [d2.id, d8.id],
- )
- self.assertCountEqual(
- search_query("&document_type__id__none=" + str(dt.id)),
- [d1.id, d3.id, d4.id, d5.id, d7.id, d8.id],
- )
- self.assertCountEqual(search_query("&storage_path__id=" + str(sp.id)), [d7.id])
- self.assertCountEqual(
- search_query(f"&storage_path__id__in={sp.id},{sp2.id}"),
- [d7.id, d8.id],
- )
- self.assertCountEqual(
- search_query("&storage_path__id__none=" + str(sp.id)),
- [d1.id, d2.id, d3.id, d4.id, d5.id, d8.id],
- )
-
- self.assertCountEqual(
- search_query("&storage_path__isnull=true"),
- [d1.id, d2.id, d3.id, d4.id, d5.id],
- )
- self.assertCountEqual(
- search_query("&correspondent__isnull=true"),
- [d2.id, d3.id, d4.id, d5.id, d7.id],
- )
- self.assertCountEqual(
- search_query("&document_type__isnull=true"),
- [d1.id, d3.id, d4.id, d5.id, d7.id],
- )
- self.assertCountEqual(
- search_query("&tags__id__all=" + str(t.id) + "," + str(t2.id)),
- [d3.id],
- )
- self.assertCountEqual(search_query("&tags__id__all=" + str(t.id)), [d3.id])
- self.assertCountEqual(
- search_query("&tags__id__all=" + str(t2.id)),
- [d3.id, d4.id],
- )
- self.assertCountEqual(
- search_query(f"&tags__id__in={t.id},{t2.id}"),
- [d3.id, d4.id],
- )
- self.assertCountEqual(
- search_query(f"&tags__id__none={t.id},{t2.id}"),
- [d1.id, d2.id, d5.id, d7.id, d8.id],
- )
-
- self.assertIn(
- d4.id,
- search_query(
- "&created__date__lt="
- + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
- ),
- )
- self.assertNotIn(
- d4.id,
- search_query(
- "&created__date__gt="
- + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
- ),
- )
-
- self.assertNotIn(
- d4.id,
- search_query(
- "&created__date__lt="
- + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
- ),
- )
- self.assertIn(
- d4.id,
- search_query(
- "&created__date__gt="
- + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
- ),
- )
-
- self.assertIn(
- d5.id,
- search_query(
- "&added__date__lt="
- + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
- ),
- )
- self.assertNotIn(
- d5.id,
- search_query(
- "&added__date__gt="
- + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
- ),
- )
-
- self.assertNotIn(
- d5.id,
- search_query(
- "&added__date__lt="
- + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
- ),
- )
-
- self.assertIn(
- d5.id,
- search_query(
- "&added__date__gt="
- + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
- ),
- )
-
- self.assertEqual(
- search_query("&checksum__icontains=foo"),
- [d8.id],
- )
-
- self.assertCountEqual(
- search_query("&original_filename__istartswith=doc"),
- [d4.id, d5.id],
- )
-
- self.assertIn(
- d1.id,
- search_query(
- "&custom_fields__icontains=" + cf1_d1.value,
- ),
- )
-
- self.assertIn(
- d1.id,
- search_query(
- "&custom_fields__icontains=" + str(cf2_d1.value),
- ),
- )
-
- self.assertIn(
- d4.id,
- search_query(
- "&custom_fields__icontains=" + cf1_d4.value,
- ),
- )
-
- def test_search_filtering_respect_owner(self):
- """
- GIVEN:
- - Documents with owners set & without
- WHEN:
- - API reuqest for advanced query (search) is made by non-superuser
- - API reuqest for advanced query (search) is made by superuser
- THEN:
- - Only owned docs are returned for regular users
- - All docs are returned for superuser
- """
- superuser = User.objects.create_superuser("superuser")
- u1 = User.objects.create_user("user1")
- u2 = User.objects.create_user("user2")
- u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
- u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
-
- Document.objects.create(checksum="1", content="test 1", owner=u1)
- Document.objects.create(checksum="2", content="test 2", owner=u2)
- Document.objects.create(checksum="3", content="test 3", owner=u2)
- Document.objects.create(checksum="4", content="test 4")
-
- with AsyncWriter(index.open_index()) as writer:
- for doc in Document.objects.all():
- index.update_document(writer, doc)
-
- self.client.force_authenticate(user=u1)
- r = self.client.get("/api/documents/?query=test")
- self.assertEqual(r.data["count"], 2)
- r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
- self.assertEqual(r.data["count"], 2)
- r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get(
- f"/api/documents/?query=test&owner__id__none={u1.id}&owner__isnull=true",
- )
- self.assertEqual(r.data["count"], 1)
-
- self.client.force_authenticate(user=u2)
- r = self.client.get("/api/documents/?query=test")
- self.assertEqual(r.data["count"], 3)
- r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
- self.assertEqual(r.data["count"], 3)
- r = self.client.get(f"/api/documents/?query=test&owner__id__none={u2.id}")
- self.assertEqual(r.data["count"], 1)
-
- self.client.force_authenticate(user=superuser)
- r = self.client.get("/api/documents/?query=test")
- self.assertEqual(r.data["count"], 4)
- r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
- self.assertEqual(r.data["count"], 4)
- r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
- self.assertEqual(r.data["count"], 3)
-
- def test_search_filtering_with_object_perms(self):
- """
- GIVEN:
- - Documents with granted view permissions to others
- WHEN:
- - API reuqest for advanced query (search) is made by user
- THEN:
- - Only docs with granted view permissions are returned
- """
- u1 = User.objects.create_user("user1")
- u2 = User.objects.create_user("user2")
- u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
- u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
-
- Document.objects.create(checksum="1", content="test 1", owner=u1)
- d2 = Document.objects.create(checksum="2", content="test 2", owner=u2)
- d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
- Document.objects.create(checksum="4", content="test 4")
-
- with AsyncWriter(index.open_index()) as writer:
- for doc in Document.objects.all():
- index.update_document(writer, doc)
-
- self.client.force_authenticate(user=u1)
- r = self.client.get("/api/documents/?query=test")
- self.assertEqual(r.data["count"], 2)
- r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
- self.assertEqual(r.data["count"], 2)
- r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get(f"/api/documents/?query=test&owner__id={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get("/api/documents/?query=test&owner__isnull=true")
- self.assertEqual(r.data["count"], 1)
-
- assign_perm("view_document", u1, d2)
- assign_perm("view_document", u1, d3)
-
- with AsyncWriter(index.open_index()) as writer:
- for doc in [d2, d3]:
- index.update_document(writer, doc)
-
- self.client.force_authenticate(user=u1)
- r = self.client.get("/api/documents/?query=test")
- self.assertEqual(r.data["count"], 4)
- r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
- self.assertEqual(r.data["count"], 4)
- r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
- self.assertEqual(r.data["count"], 3)
- r = self.client.get(f"/api/documents/?query=test&owner__id={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
- self.assertEqual(r.data["count"], 1)
- r = self.client.get("/api/documents/?query=test&owner__isnull=true")
- self.assertEqual(r.data["count"], 1)
-
- def test_search_sorting(self):
- u1 = User.objects.create_user("user1")
- u2 = User.objects.create_user("user2")
- c1 = Correspondent.objects.create(name="corres Ax")
- c2 = Correspondent.objects.create(name="corres Cx")
- c3 = Correspondent.objects.create(name="corres Bx")
- d1 = Document.objects.create(
- checksum="1",
- correspondent=c1,
- content="test",
- archive_serial_number=2,
- title="3",
- owner=u1,
- )
- d2 = Document.objects.create(
- checksum="2",
- correspondent=c2,
- content="test",
- archive_serial_number=3,
- title="2",
- owner=u2,
- )
- d3 = Document.objects.create(
- checksum="3",
- correspondent=c3,
- content="test",
- archive_serial_number=1,
- title="1",
- )
- Note.objects.create(
- note="This is a note.",
- document=d1,
- user=u1,
- )
- Note.objects.create(
- note="This is a note.",
- document=d1,
- user=u1,
- )
- Note.objects.create(
- note="This is a note.",
- document=d3,
- user=u1,
- )
-
- with AsyncWriter(index.open_index()) as writer:
- for doc in Document.objects.all():
- index.update_document(writer, doc)
-
- def search_query(q):
- r = self.client.get("/api/documents/?query=test" + q)
- self.assertEqual(r.status_code, status.HTTP_200_OK)
- return [hit["id"] for hit in r.data["results"]]
-
- self.assertListEqual(
- search_query("&ordering=archive_serial_number"),
- [d3.id, d1.id, d2.id],
- )
- self.assertListEqual(
- search_query("&ordering=-archive_serial_number"),
- [d2.id, d1.id, d3.id],
- )
- self.assertListEqual(search_query("&ordering=title"), [d3.id, d2.id, d1.id])
- self.assertListEqual(search_query("&ordering=-title"), [d1.id, d2.id, d3.id])
- self.assertListEqual(
- search_query("&ordering=correspondent__name"),
- [d1.id, d3.id, d2.id],
- )
- self.assertListEqual(
- search_query("&ordering=-correspondent__name"),
- [d2.id, d3.id, d1.id],
- )
- self.assertListEqual(
- search_query("&ordering=num_notes"),
- [d2.id, d3.id, d1.id],
- )
- self.assertListEqual(
- search_query("&ordering=-num_notes"),
- [d1.id, d3.id, d2.id],
- )
- self.assertListEqual(
- search_query("&ordering=owner"),
- [d1.id, d2.id, d3.id],
- )
- self.assertListEqual(
- search_query("&ordering=-owner"),
- [d3.id, d2.id, d1.id],
- )
-
def test_pagination_all(self):
"""
GIVEN:
- A set of 50 documents
WHEN:
- - API reuqest for document filtering
+ - API request for document filtering
THEN:
- Results are paginated (25 items) and response["all"] returns all ids (50 items)
"""
--- /dev/null
+import datetime
+from datetime import timedelta
+from unittest import mock
+
+import pytest
+from dateutil.relativedelta import relativedelta
+from django.contrib.auth.models import Permission
+from django.contrib.auth.models import User
+from django.test import override_settings
+from django.utils import timezone
+from guardian.shortcuts import assign_perm
+from rest_framework import status
+from rest_framework.test import APITestCase
+from whoosh.writing import AsyncWriter
+
+from documents import index
+from documents.models import Correspondent
+from documents.models import CustomField
+from documents.models import CustomFieldInstance
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import Note
+from documents.models import StoragePath
+from documents.models import Tag
+from documents.tests.utils import DirectoriesMixin
+
+
+class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.user = User.objects.create_superuser(username="temp_admin")
+ self.client.force_authenticate(user=self.user)
+
+ def test_search(self):
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ original_filename="someepdf.pdf",
+ )
+ with AsyncWriter(index.open_index()) as writer:
+ # Note to future self: there is a reason we dont use a model signal handler to update the index: some operations edit many documents at once
+ # (retagger, renamer) and we don't want to open a writer for each of these, but rather perform the entire operation with one writer.
+ # That's why we cant open the writer in a model on_save handler or something.
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+ response = self.client.get("/api/documents/?query=bank")
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 3)
+ self.assertEqual(len(results), 3)
+ self.assertCountEqual(response.data["all"], [d1.id, d2.id, d3.id])
+
+ response = self.client.get("/api/documents/?query=september")
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 1)
+ self.assertEqual(len(results), 1)
+ self.assertCountEqual(response.data["all"], [d3.id])
+ self.assertEqual(results[0]["original_file_name"], "someepdf.pdf")
+
+ response = self.client.get("/api/documents/?query=statement")
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 2)
+ self.assertEqual(len(results), 2)
+ self.assertCountEqual(response.data["all"], [d2.id, d3.id])
+
+ response = self.client.get("/api/documents/?query=sfegdfg")
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 0)
+ self.assertEqual(len(results), 0)
+ self.assertCountEqual(response.data["all"], [])
+
+ def test_search_multi_page(self):
+ with AsyncWriter(index.open_index()) as writer:
+ for i in range(55):
+ doc = Document.objects.create(
+ checksum=str(i),
+ pk=i + 1,
+ title=f"Document {i+1}",
+ content="content",
+ )
+ index.update_document(writer, doc)
+
+ # This is here so that we test that no document gets returned twice (might happen if the paging is not working)
+ seen_ids = []
+
+ for i in range(1, 6):
+ response = self.client.get(
+ f"/api/documents/?query=content&page={i}&page_size=10",
+ )
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 55)
+ self.assertEqual(len(results), 10)
+
+ for result in results:
+ self.assertNotIn(result["id"], seen_ids)
+ seen_ids.append(result["id"])
+
+ response = self.client.get("/api/documents/?query=content&page=6&page_size=10")
+ results = response.data["results"]
+ self.assertEqual(response.data["count"], 55)
+ self.assertEqual(len(results), 5)
+
+ for result in results:
+ self.assertNotIn(result["id"], seen_ids)
+ seen_ids.append(result["id"])
+
+ def test_search_invalid_page(self):
+ with AsyncWriter(index.open_index()) as writer:
+ for i in range(15):
+ doc = Document.objects.create(
+ checksum=str(i),
+ pk=i + 1,
+ title=f"Document {i+1}",
+ content="content",
+ )
+ index.update_document(writer, doc)
+
+ response = self.client.get("/api/documents/?query=content&page=0&page_size=10")
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+ response = self.client.get("/api/documents/?query=content&page=3&page_size=10")
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+ @override_settings(
+ TIME_ZONE="UTC",
+ )
+ def test_search_added_in_last_week(self):
+ """
+ GIVEN:
+ - Three documents added right now
+ - The timezone is UTC time
+ WHEN:
+ - Query for documents added in the last 7 days
+ THEN:
+ - All three recent documents are returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ )
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:[-1 week to now]")
+ results = response.data["results"]
+ # Expect 3 documents returned
+ self.assertEqual(len(results), 3)
+
+ for idx, subset in enumerate(
+ [
+ {"id": 1, "title": "invoice"},
+ {"id": 2, "title": "bank statement 1"},
+ {"id": 3, "title": "bank statement 3"},
+ ],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ @override_settings(
+ TIME_ZONE="America/Chicago",
+ )
+ def test_search_added_in_last_week_with_timezone_behind(self):
+ """
+ GIVEN:
+ - Two documents added right now
+ - One document added over a week ago
+ - The timezone is behind UTC time (-6)
+ WHEN:
+ - Query for documents added in the last 7 days
+ THEN:
+ - The two recent documents are returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ # 7 days, 1 hour and 1 minute ago
+ added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
+ )
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:[-1 week to now]")
+ results = response.data["results"]
+
+ # Expect 2 documents returned
+ self.assertEqual(len(results), 2)
+
+ for idx, subset in enumerate(
+ [{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ @override_settings(
+ TIME_ZONE="Europe/Sofia",
+ )
+ def test_search_added_in_last_week_with_timezone_ahead(self):
+ """
+ GIVEN:
+ - Two documents added right now
+ - One document added over a week ago
+ - The timezone is behind UTC time (+2)
+ WHEN:
+ - Query for documents added in the last 7 days
+ THEN:
+ - The two recent documents are returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ # 7 days, 1 hour and 1 minute ago
+ added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
+ )
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:[-1 week to now]")
+ results = response.data["results"]
+
+ # Expect 2 documents returned
+ self.assertEqual(len(results), 2)
+
+ for idx, subset in enumerate(
+ [{"id": 1, "title": "invoice"}, {"id": 2, "title": "bank statement 1"}],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ def test_search_added_in_last_month(self):
+ """
+ GIVEN:
+ - One document added right now
+ - One documents added about a week ago
+ - One document added over 1 month
+ WHEN:
+ - Query for documents added in the last month
+ THEN:
+ - The two recent documents are returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ # 1 month, 1 day ago
+ added=timezone.now() - relativedelta(months=1, days=1),
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ # 7 days, 1 hour and 1 minute ago
+ added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
+ )
+
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:[-1 month to now]")
+ results = response.data["results"]
+
+ # Expect 2 documents returned
+ self.assertEqual(len(results), 2)
+
+ for idx, subset in enumerate(
+ [{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ @override_settings(
+ TIME_ZONE="America/Denver",
+ )
+ def test_search_added_in_last_month_timezone_behind(self):
+ """
+ GIVEN:
+ - One document added right now
+ - One documents added about a week ago
+ - One document added over 1 month
+ - The timezone is behind UTC time (-6 or -7)
+ WHEN:
+ - Query for documents added in the last month
+ THEN:
+ - The two recent documents are returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ # 1 month, 1 day ago
+ added=timezone.now() - relativedelta(months=1, days=1),
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ # 7 days, 1 hour and 1 minute ago
+ added=timezone.now() - timedelta(days=7, hours=1, minutes=1),
+ )
+
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:[-1 month to now]")
+ results = response.data["results"]
+
+ # Expect 2 documents returned
+ self.assertEqual(len(results), 2)
+
+ for idx, subset in enumerate(
+ [{"id": 1, "title": "invoice"}, {"id": 3, "title": "bank statement 3"}],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ @override_settings(
+ TIME_ZONE="Europe/Sofia",
+ )
+ def test_search_added_specific_date_with_timezone_ahead(self):
+ """
+ GIVEN:
+ - Two documents added right now
+ - One document added on a specific date
+ - The timezone is behind UTC time (+2)
+ WHEN:
+ - Query for documents added on a specific date
+ THEN:
+ - The one document is returned
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ # specific time zone aware date
+ added=timezone.make_aware(datetime.datetime(2023, 12, 1)),
+ )
+ # refresh doc instance to ensure we operate on date objects that Django uses
+ # Django converts dates to UTC
+ d3.refresh_from_db()
+
+ with index.open_index_writer() as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/documents/?query=added:20231201")
+ results = response.data["results"]
+
+ # Expect 1 document returned
+ self.assertEqual(len(results), 1)
+
+ for idx, subset in enumerate(
+ [{"id": 3, "title": "bank statement 3"}],
+ ):
+ result = results[idx]
+ # Assert subset in results
+ self.assertDictEqual(result, {**result, **subset})
+
+ @mock.patch("documents.index.autocomplete")
+ def test_search_autocomplete_limits(self, m):
+ """
+ GIVEN:
+ - No pre-conditions
+ WHEN:
+ - API request for autocomplete is made by user with various limit requests
+ THEN:
+ - Limit requests are validated
+ - Limit requests are obeyed
+ """
+
+ m.side_effect = lambda ix, term, limit, user: [term for _ in range(limit)]
+
+ response = self.client.get("/api/search/autocomplete/?term=test")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(len(response.data), 10)
+
+ response = self.client.get("/api/search/autocomplete/?term=test&limit=20")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(len(response.data), 20)
+
+ response = self.client.get("/api/search/autocomplete/?term=test&limit=-1")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+
+ response = self.client.get("/api/search/autocomplete/")
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+
+ response = self.client.get("/api/search/autocomplete/?term=")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(len(response.data), 10)
+
+ def test_search_autocomplete_respect_permissions(self):
+ """
+ GIVEN:
+ - Multiple users and documents with & without permissions
+ WHEN:
+ - API request for autocomplete is made by user with or without permissions
+ THEN:
+ - Terms only within docs user has access to are returned
+ """
+ u1 = User.objects.create_user("user1")
+ u2 = User.objects.create_user("user2")
+
+ self.client.force_authenticate(user=u1)
+
+ d1 = Document.objects.create(
+ title="doc1",
+ content="apples",
+ checksum="1",
+ owner=u1,
+ )
+ d2 = Document.objects.create(
+ title="doc2",
+ content="applebaum",
+ checksum="2",
+ owner=u1,
+ )
+ d3 = Document.objects.create(
+ title="doc3",
+ content="appletini",
+ checksum="3",
+ owner=u1,
+ )
+
+ with AsyncWriter(index.open_index()) as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/search/autocomplete/?term=app")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
+
+ d3.owner = u2
+
+ with AsyncWriter(index.open_index()) as writer:
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/search/autocomplete/?term=app")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data, [b"apples", b"applebaum"])
+
+ assign_perm("view_document", u1, d3)
+
+ with AsyncWriter(index.open_index()) as writer:
+ index.update_document(writer, d3)
+
+ response = self.client.get("/api/search/autocomplete/?term=app")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data, [b"apples", b"applebaum", b"appletini"])
+
+ def test_search_autocomplete_field_name_match(self):
+ """
+ GIVEN:
+ - One document exists in index (must be one document to experience the crash)
+ WHEN:
+ - API request for autocomplete is made with a query which looks like a schema field
+ THEN:
+ - No autocomplete terms returns
+ - No UnicodeDecodeError due to weird binary data returned from index
+ """
+ d1 = Document.objects.create(
+ title="doc1",
+ content="my really neat document",
+ checksum="1",
+ )
+
+ with AsyncWriter(index.open_index()) as writer:
+ index.update_document(writer, d1)
+
+ response = self.client.get("/api/search/autocomplete/?term=created:2023")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data, [])
+
+ @pytest.mark.skip(reason="Not implemented yet")
+ def test_search_spelling_correction(self):
+ with AsyncWriter(index.open_index()) as writer:
+ for i in range(55):
+ doc = Document.objects.create(
+ checksum=str(i),
+ pk=i + 1,
+ title=f"Document {i+1}",
+ content=f"Things document {i+1}",
+ )
+ index.update_document(writer, doc)
+
+ response = self.client.get("/api/search/?query=thing")
+ correction = response.data["corrected_query"]
+
+ self.assertEqual(correction, "things")
+
+ response = self.client.get("/api/search/?query=things")
+ correction = response.data["corrected_query"]
+
+ self.assertEqual(correction, None)
+
+ def test_search_more_like(self):
+ """
+ GIVEN:
+ - Documents exist which have similar content
+ - At least 1 document exists which is not similar in content
+ WHEN:
+ - API request for more like a given document
+ THEN:
+ - The similar documents are returned from the API request
+ """
+ d1 = Document.objects.create(
+ title="invoice",
+ content="the thing i bought at a shop and paid with bank account",
+ checksum="A",
+ pk=1,
+ )
+ d2 = Document.objects.create(
+ title="bank statement 1",
+ content="things i paid for in august",
+ pk=2,
+ checksum="B",
+ )
+ d3 = Document.objects.create(
+ title="bank statement 3",
+ content="things i paid for in september",
+ pk=3,
+ checksum="C",
+ )
+ d4 = Document.objects.create(
+ title="Monty Python & the Holy Grail",
+ content="And now for something completely different",
+ pk=4,
+ checksum="ABC",
+ )
+ with AsyncWriter(index.open_index()) as writer:
+ index.update_document(writer, d1)
+ index.update_document(writer, d2)
+ index.update_document(writer, d3)
+ index.update_document(writer, d4)
+
+ response = self.client.get(f"/api/documents/?more_like_id={d2.id}")
+
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+
+ results = response.data["results"]
+
+ self.assertEqual(len(results), 2)
+ self.assertEqual(results[0]["id"], d3.id)
+ self.assertEqual(results[1]["id"], d1.id)
+
+ def test_search_filtering(self):
+ t = Tag.objects.create(name="tag")
+ t2 = Tag.objects.create(name="tag2")
+ c = Correspondent.objects.create(name="correspondent")
+ c2 = Correspondent.objects.create(name="correspondent2")
+ dt = DocumentType.objects.create(name="type")
+ dt2 = DocumentType.objects.create(name="type2")
+ sp = StoragePath.objects.create(name="path")
+ sp2 = StoragePath.objects.create(name="path2")
+ cf1 = CustomField.objects.create(
+ name="string field",
+ data_type=CustomField.FieldDataType.STRING,
+ )
+ cf2 = CustomField.objects.create(
+ name="number field",
+ data_type=CustomField.FieldDataType.INT,
+ )
+
+ d1 = Document.objects.create(checksum="1", correspondent=c, content="test")
+ d2 = Document.objects.create(checksum="2", document_type=dt, content="test")
+ d3 = Document.objects.create(checksum="3", content="test")
+
+ d3.tags.add(t)
+ d3.tags.add(t2)
+ d4 = Document.objects.create(
+ checksum="4",
+ created=timezone.make_aware(datetime.datetime(2020, 7, 13)),
+ content="test",
+ original_filename="doc4.pdf",
+ )
+ d4.tags.add(t2)
+ d5 = Document.objects.create(
+ checksum="5",
+ added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
+ content="test",
+ original_filename="doc5.pdf",
+ )
+ Document.objects.create(checksum="6", content="test2")
+ d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
+ d8 = Document.objects.create(
+ checksum="foo",
+ correspondent=c2,
+ document_type=dt2,
+ storage_path=sp2,
+ content="test",
+ )
+
+ cf1_d1 = CustomFieldInstance.objects.create(
+ document=d1,
+ field=cf1,
+ value_text="foobard1",
+ )
+ cf2_d1 = CustomFieldInstance.objects.create(
+ document=d1,
+ field=cf2,
+ value_int=999,
+ )
+ cf1_d4 = CustomFieldInstance.objects.create(
+ document=d4,
+ field=cf1,
+ value_text="foobard4",
+ )
+
+ with AsyncWriter(index.open_index()) as writer:
+ for doc in Document.objects.all():
+ index.update_document(writer, doc)
+
+ def search_query(q):
+ r = self.client.get("/api/documents/?query=test" + q)
+ self.assertEqual(r.status_code, status.HTTP_200_OK)
+ return [hit["id"] for hit in r.data["results"]]
+
+ self.assertCountEqual(
+ search_query(""),
+ [d1.id, d2.id, d3.id, d4.id, d5.id, d7.id, d8.id],
+ )
+ self.assertCountEqual(search_query("&is_tagged=true"), [d3.id, d4.id])
+ self.assertCountEqual(
+ search_query("&is_tagged=false"),
+ [d1.id, d2.id, d5.id, d7.id, d8.id],
+ )
+ self.assertCountEqual(search_query("&correspondent__id=" + str(c.id)), [d1.id])
+ self.assertCountEqual(
+ search_query(f"&correspondent__id__in={c.id},{c2.id}"),
+ [d1.id, d8.id],
+ )
+ self.assertCountEqual(
+ search_query("&correspondent__id__none=" + str(c.id)),
+ [d2.id, d3.id, d4.id, d5.id, d7.id, d8.id],
+ )
+ self.assertCountEqual(search_query("&document_type__id=" + str(dt.id)), [d2.id])
+ self.assertCountEqual(
+ search_query(f"&document_type__id__in={dt.id},{dt2.id}"),
+ [d2.id, d8.id],
+ )
+ self.assertCountEqual(
+ search_query("&document_type__id__none=" + str(dt.id)),
+ [d1.id, d3.id, d4.id, d5.id, d7.id, d8.id],
+ )
+ self.assertCountEqual(search_query("&storage_path__id=" + str(sp.id)), [d7.id])
+ self.assertCountEqual(
+ search_query(f"&storage_path__id__in={sp.id},{sp2.id}"),
+ [d7.id, d8.id],
+ )
+ self.assertCountEqual(
+ search_query("&storage_path__id__none=" + str(sp.id)),
+ [d1.id, d2.id, d3.id, d4.id, d5.id, d8.id],
+ )
+
+ self.assertCountEqual(
+ search_query("&storage_path__isnull=true"),
+ [d1.id, d2.id, d3.id, d4.id, d5.id],
+ )
+ self.assertCountEqual(
+ search_query("&correspondent__isnull=true"),
+ [d2.id, d3.id, d4.id, d5.id, d7.id],
+ )
+ self.assertCountEqual(
+ search_query("&document_type__isnull=true"),
+ [d1.id, d3.id, d4.id, d5.id, d7.id],
+ )
+ self.assertCountEqual(
+ search_query("&tags__id__all=" + str(t.id) + "," + str(t2.id)),
+ [d3.id],
+ )
+ self.assertCountEqual(search_query("&tags__id__all=" + str(t.id)), [d3.id])
+ self.assertCountEqual(
+ search_query("&tags__id__all=" + str(t2.id)),
+ [d3.id, d4.id],
+ )
+ self.assertCountEqual(
+ search_query(f"&tags__id__in={t.id},{t2.id}"),
+ [d3.id, d4.id],
+ )
+ self.assertCountEqual(
+ search_query(f"&tags__id__none={t.id},{t2.id}"),
+ [d1.id, d2.id, d5.id, d7.id, d8.id],
+ )
+
+ self.assertIn(
+ d4.id,
+ search_query(
+ "&created__date__lt="
+ + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+ self.assertNotIn(
+ d4.id,
+ search_query(
+ "&created__date__gt="
+ + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+
+ self.assertNotIn(
+ d4.id,
+ search_query(
+ "&created__date__lt="
+ + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+ self.assertIn(
+ d4.id,
+ search_query(
+ "&created__date__gt="
+ + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+
+ self.assertIn(
+ d5.id,
+ search_query(
+ "&added__date__lt="
+ + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+ self.assertNotIn(
+ d5.id,
+ search_query(
+ "&added__date__gt="
+ + datetime.datetime(2020, 9, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+
+ self.assertNotIn(
+ d5.id,
+ search_query(
+ "&added__date__lt="
+ + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+
+ self.assertIn(
+ d5.id,
+ search_query(
+ "&added__date__gt="
+ + datetime.datetime(2020, 1, 2).strftime("%Y-%m-%d"),
+ ),
+ )
+
+ self.assertEqual(
+ search_query("&checksum__icontains=foo"),
+ [d8.id],
+ )
+
+ self.assertCountEqual(
+ search_query("&original_filename__istartswith=doc"),
+ [d4.id, d5.id],
+ )
+
+ self.assertIn(
+ d1.id,
+ search_query(
+ "&custom_fields__icontains=" + cf1_d1.value,
+ ),
+ )
+
+ self.assertIn(
+ d1.id,
+ search_query(
+ "&custom_fields__icontains=" + str(cf2_d1.value),
+ ),
+ )
+
+ self.assertIn(
+ d4.id,
+ search_query(
+ "&custom_fields__icontains=" + cf1_d4.value,
+ ),
+ )
+
+ def test_search_filtering_respect_owner(self):
+ """
+ GIVEN:
+ - Documents with owners set & without
+ WHEN:
+ - API reuqest for advanced query (search) is made by non-superuser
+ - API reuqest for advanced query (search) is made by superuser
+ THEN:
+ - Only owned docs are returned for regular users
+ - All docs are returned for superuser
+ """
+ superuser = User.objects.create_superuser("superuser")
+ u1 = User.objects.create_user("user1")
+ u2 = User.objects.create_user("user2")
+ u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+ u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+
+ Document.objects.create(checksum="1", content="test 1", owner=u1)
+ Document.objects.create(checksum="2", content="test 2", owner=u2)
+ Document.objects.create(checksum="3", content="test 3", owner=u2)
+ Document.objects.create(checksum="4", content="test 4")
+
+ with AsyncWriter(index.open_index()) as writer:
+ for doc in Document.objects.all():
+ index.update_document(writer, doc)
+
+ self.client.force_authenticate(user=u1)
+ r = self.client.get("/api/documents/?query=test")
+ self.assertEqual(r.data["count"], 2)
+ r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+ self.assertEqual(r.data["count"], 2)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get(
+ f"/api/documents/?query=test&owner__id__none={u1.id}&owner__isnull=true",
+ )
+ self.assertEqual(r.data["count"], 1)
+
+ self.client.force_authenticate(user=u2)
+ r = self.client.get("/api/documents/?query=test")
+ self.assertEqual(r.data["count"], 3)
+ r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+ self.assertEqual(r.data["count"], 3)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__none={u2.id}")
+ self.assertEqual(r.data["count"], 1)
+
+ self.client.force_authenticate(user=superuser)
+ r = self.client.get("/api/documents/?query=test")
+ self.assertEqual(r.data["count"], 4)
+ r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+ self.assertEqual(r.data["count"], 4)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
+ self.assertEqual(r.data["count"], 3)
+
+ def test_search_filtering_with_object_perms(self):
+ """
+ GIVEN:
+ - Documents with granted view permissions to others
+ WHEN:
+ - API reuqest for advanced query (search) is made by user
+ THEN:
+ - Only docs with granted view permissions are returned
+ """
+ u1 = User.objects.create_user("user1")
+ u2 = User.objects.create_user("user2")
+ u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+ u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
+
+ Document.objects.create(checksum="1", content="test 1", owner=u1)
+ d2 = Document.objects.create(checksum="2", content="test 2", owner=u2)
+ d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
+ Document.objects.create(checksum="4", content="test 4")
+
+ with AsyncWriter(index.open_index()) as writer:
+ for doc in Document.objects.all():
+ index.update_document(writer, doc)
+
+ self.client.force_authenticate(user=u1)
+ r = self.client.get("/api/documents/?query=test")
+ self.assertEqual(r.data["count"], 2)
+ r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+ self.assertEqual(r.data["count"], 2)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get(f"/api/documents/?query=test&owner__id={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get("/api/documents/?query=test&owner__isnull=true")
+ self.assertEqual(r.data["count"], 1)
+
+ assign_perm("view_document", u1, d2)
+ assign_perm("view_document", u1, d3)
+
+ with AsyncWriter(index.open_index()) as writer:
+ for doc in [d2, d3]:
+ index.update_document(writer, doc)
+
+ self.client.force_authenticate(user=u1)
+ r = self.client.get("/api/documents/?query=test")
+ self.assertEqual(r.data["count"], 4)
+ r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
+ self.assertEqual(r.data["count"], 4)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__none={u1.id}")
+ self.assertEqual(r.data["count"], 3)
+ r = self.client.get(f"/api/documents/?query=test&owner__id={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get(f"/api/documents/?query=test&owner__id__in={u1.id}")
+ self.assertEqual(r.data["count"], 1)
+ r = self.client.get("/api/documents/?query=test&owner__isnull=true")
+ self.assertEqual(r.data["count"], 1)
+
+ def test_search_sorting(self):
+ u1 = User.objects.create_user("user1")
+ u2 = User.objects.create_user("user2")
+ c1 = Correspondent.objects.create(name="corres Ax")
+ c2 = Correspondent.objects.create(name="corres Cx")
+ c3 = Correspondent.objects.create(name="corres Bx")
+ d1 = Document.objects.create(
+ checksum="1",
+ correspondent=c1,
+ content="test",
+ archive_serial_number=2,
+ title="3",
+ owner=u1,
+ )
+ d2 = Document.objects.create(
+ checksum="2",
+ correspondent=c2,
+ content="test",
+ archive_serial_number=3,
+ title="2",
+ owner=u2,
+ )
+ d3 = Document.objects.create(
+ checksum="3",
+ correspondent=c3,
+ content="test",
+ archive_serial_number=1,
+ title="1",
+ )
+ Note.objects.create(
+ note="This is a note.",
+ document=d1,
+ user=u1,
+ )
+ Note.objects.create(
+ note="This is a note.",
+ document=d1,
+ user=u1,
+ )
+ Note.objects.create(
+ note="This is a note.",
+ document=d3,
+ user=u1,
+ )
+
+ with AsyncWriter(index.open_index()) as writer:
+ for doc in Document.objects.all():
+ index.update_document(writer, doc)
+
+ def search_query(q):
+ r = self.client.get("/api/documents/?query=test" + q)
+ self.assertEqual(r.status_code, status.HTTP_200_OK)
+ return [hit["id"] for hit in r.data["results"]]
+
+ self.assertListEqual(
+ search_query("&ordering=archive_serial_number"),
+ [d3.id, d1.id, d2.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=-archive_serial_number"),
+ [d2.id, d1.id, d3.id],
+ )
+ self.assertListEqual(search_query("&ordering=title"), [d3.id, d2.id, d1.id])
+ self.assertListEqual(search_query("&ordering=-title"), [d1.id, d2.id, d3.id])
+ self.assertListEqual(
+ search_query("&ordering=correspondent__name"),
+ [d1.id, d3.id, d2.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=-correspondent__name"),
+ [d2.id, d3.id, d1.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=num_notes"),
+ [d2.id, d3.id, d1.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=-num_notes"),
+ [d1.id, d3.id, d2.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=owner"),
+ [d1.id, d2.id, d3.id],
+ )
+ self.assertListEqual(
+ search_query("&ordering=-owner"),
+ [d3.id, d2.id, d1.id],
+ )