search_index() {
- local -r index_version=5
+ local -r index_version=6
local -r index_version_file=${DATA_DIR}/.index_version
if [[ (! -f "${index_version_file}") || $(<"${index_version_file}") != "$index_version" ]]; then
"created": DATE_KWARGS,
"added": DATE_KWARGS,
"modified": DATE_KWARGS,
+ "original_filename": CHAR_KWARGS,
+ "checksum": CHAR_KWARGS,
"correspondent": ["isnull"],
"correspondent__id": ID_KWARGS,
"correspondent__name": CHAR_KWARGS,
owner_id=NUMERIC(),
has_owner=BOOLEAN(),
viewer_id=KEYWORD(commas=True),
+ checksum=TEXT(),
+ original_filename=TEXT(sortable=True),
)
owner_id=doc.owner.id if doc.owner else None,
has_owner=doc.owner is not None,
viewer_id=viewer_ids if viewer_ids else None,
+ checksum=doc.checksum,
+ original_filename=doc.original_filename,
)
class DelayedQuery:
+ param_map = {
+ "correspondent": ("correspondent", ["id", "id__in", "id__none", "isnull"]),
+ "document_type": ("type", ["id", "id__in", "id__none", "isnull"]),
+ "storage_path": ("path", ["id", "id__in", "id__none", "isnull"]),
+ "owner": ("owner", ["id", "id__in", "id__none", "isnull"]),
+ "tags": ("tag", ["id__all", "id__in", "id__none"]),
+ "added": ("added", ["date__lt", "date__gt"]),
+ "created": ("created", ["date__lt", "date__gt"]),
+ "checksum": ("checksum", ["icontains", "istartswith"]),
+ "original_filename": ("original_filename", ["icontains", "istartswith"]),
+ }
+
def _get_query(self):
raise NotImplementedError
def _get_query_filter(self):
criterias = []
- for k, v in self.query_params.items():
- if k == "correspondent__id":
- criterias.append(query.Term("correspondent_id", v))
- elif k == "correspondent__id__in":
- correspondents_in = []
- for correspondent_id in v.split(","):
- correspondents_in.append(
- query.Term("correspondent_id", correspondent_id),
+ for key, value in self.query_params.items():
+ # is_tagged is a special case
+ if key == "is_tagged":
+ criterias.append(query.Term("has_tag", self.evalBoolean(value)))
+ continue
+
+ # Don't process query params without a filter
+ if "__" not in key:
+ continue
+
+ # All other query params consist of a parameter and a query filter
+ param, query_filter = key.split("__", 1)
+ try:
+ field, supported_query_filters = self.param_map[param]
+ except KeyError:
+ logger.error("Unable to build a query filter for parameter %s", key)
+ continue
+
+ # We only support certain filters per parameter
+ if query_filter not in supported_query_filters:
+ logger.info(
+ f"Query filter {query_filter} not supported for parameter {param}",
+ )
+ continue
+
+ if query_filter == "id":
+ criterias.append(query.Term(f"{field}_id", value))
+ elif query_filter == "id__in":
+ in_filter = []
+ for object_id in value.split(","):
+ in_filter.append(
+ query.Term(f"{field}_id", object_id),
)
- criterias.append(query.Or(correspondents_in))
- elif k == "correspondent__id__none":
- for correspondent_id in v.split(","):
+ criterias.append(query.Or(in_filter))
+ elif query_filter == "id__none":
+ for object_id in value.split(","):
criterias.append(
- query.Not(query.Term("correspondent_id", correspondent_id)),
+ query.Not(query.Term(f"{field}_id", object_id)),
)
- elif k == "tags__id__all":
- for tag_id in v.split(","):
- criterias.append(query.Term("tag_id", tag_id))
- elif k == "tags__id__none":
- for tag_id in v.split(","):
- criterias.append(query.Not(query.Term("tag_id", tag_id)))
- elif k == "tags__id__in":
- tags_in = []
- for tag_id in v.split(","):
- tags_in.append(query.Term("tag_id", tag_id))
- criterias.append(query.Or(tags_in))
- elif k == "document_type__id":
- criterias.append(query.Term("type_id", v))
- elif k == "document_type__id__in":
- document_types_in = []
- for document_type_id in v.split(","):
- document_types_in.append(query.Term("type_id", document_type_id))
- criterias.append(query.Or(document_types_in))
- elif k == "document_type__id__none":
- for document_type_id in v.split(","):
- criterias.append(query.Not(query.Term("type_id", document_type_id)))
- elif k == "correspondent__isnull":
+ elif query_filter == "isnull":
+ criterias.append(
+ query.Term(f"has_{field}", self.evalBoolean(value) is False),
+ )
+ elif query_filter == "id__all":
+ for object_id in value.split(","):
+ criterias.append(query.Term(f"{field}_id", object_id))
+ elif query_filter == "date__lt":
+ criterias.append(
+ query.DateRange(field, start=None, end=isoparse(value)),
+ )
+ elif query_filter == "date__gt":
criterias.append(
- query.Term("has_correspondent", self.evalBoolean(v) is False),
+ query.DateRange(field, start=isoparse(value), end=None),
)
- elif k == "is_tagged":
- criterias.append(query.Term("has_tag", self.evalBoolean(v)))
- elif k == "document_type__isnull":
- criterias.append(query.Term("has_type", self.evalBoolean(v) is False))
- elif k == "created__date__lt":
+ elif query_filter == "icontains":
criterias.append(
- query.DateRange("created", start=None, end=isoparse(v)),
+ query.Term(field, value),
)
- elif k == "created__date__gt":
+ elif query_filter == "istartswith":
criterias.append(
- query.DateRange("created", start=isoparse(v), end=None),
+ query.Prefix(field, value),
)
- elif k == "added__date__gt":
- criterias.append(query.DateRange("added", start=isoparse(v), end=None))
- elif k == "added__date__lt":
- criterias.append(query.DateRange("added", start=None, end=isoparse(v)))
- elif k == "storage_path__id":
- criterias.append(query.Term("path_id", v))
- elif k == "storage_path__id__in":
- storage_paths_in = []
- for storage_path_id in v.split(","):
- storage_paths_in.append(query.Term("path_id", storage_path_id))
- criterias.append(query.Or(storage_paths_in))
- elif k == "storage_path__id__none":
- for storage_path_id in v.split(","):
- criterias.append(query.Not(query.Term("path_id", storage_path_id)))
- elif k == "storage_path__isnull":
- criterias.append(query.Term("has_path", self.evalBoolean(v) is False))
- elif k == "owner__isnull":
- criterias.append(query.Term("has_owner", self.evalBoolean(v) is False))
- elif k == "owner__id":
- criterias.append(query.Term("owner_id", v))
- elif k == "owner__id__in":
- owners_in = []
- for owner_id in v.split(","):
- owners_in.append(query.Term("owner_id", owner_id))
- criterias.append(query.Or(owners_in))
- elif k == "owner__id__none":
- for owner_id in v.split(","):
- criterias.append(query.Not(query.Term("owner_id", owner_id)))
user_criterias = get_permissions_criterias(
user=self.user,
results = response.data["results"]
self.assertEqual(len(results), 0)
+ def test_document_checksum_filter(self):
+ Document.objects.create(
+ title="none1",
+ checksum="A",
+ mime_type="application/pdf",
+ )
+ doc2 = Document.objects.create(
+ title="none2",
+ checksum="B",
+ mime_type="application/pdf",
+ )
+ Document.objects.create(
+ title="none3",
+ checksum="C",
+ mime_type="application/pdf",
+ )
+
+ response = self.client.get("/api/documents/?checksum__iexact=B")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ results = response.data["results"]
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]["id"], doc2.id)
+
+ response = self.client.get("/api/documents/?checksum__iexact=X")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ results = response.data["results"]
+ self.assertEqual(len(results), 0)
+
+ def test_document_original_filename_filter(self):
+ doc1 = Document.objects.create(
+ title="none1",
+ checksum="A",
+ mime_type="application/pdf",
+ original_filename="docA.pdf",
+ )
+ doc2 = Document.objects.create(
+ title="none2",
+ checksum="B",
+ mime_type="application/pdf",
+ original_filename="docB.pdf",
+ )
+ doc3 = Document.objects.create(
+ title="none3",
+ checksum="C",
+ mime_type="application/pdf",
+ original_filename="docC.pdf",
+ )
+
+ response = self.client.get("/api/documents/?original_filename__iexact=DOCa.pdf")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ results = response.data["results"]
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]["id"], doc1.id)
+
+ response = self.client.get("/api/documents/?original_filename__iexact=docx.pdf")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ results = response.data["results"]
+ self.assertEqual(len(results), 0)
+
+ response = self.client.get("/api/documents/?original_filename__istartswith=dOc")
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ results = response.data["results"]
+ self.assertEqual(len(results), 3)
+ self.assertCountEqual(
+ [results[0]["id"], results[1]["id"], results[2]["id"]],
+ [doc1.id, doc2.id, doc3.id],
+ )
+
def test_documents_title_content_filter(self):
doc1 = Document.objects.create(
title="title A",
checksum="4",
created=timezone.make_aware(datetime.datetime(2020, 7, 13)),
content="test",
+ original_filename="doc4.pdf",
)
d4.tags.add(t2)
d5 = Document.objects.create(
checksum="5",
added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
content="test",
+ original_filename="doc5.pdf",
)
Document.objects.create(checksum="6", content="test2")
d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
d8 = Document.objects.create(
- checksum="8",
+ checksum="foo",
correspondent=c2,
document_type=dt2,
storage_path=sp2,
),
)
+ self.assertEqual(
+ search_query("&checksum__icontains=foo"),
+ [d8.id],
+ )
+
+ self.assertCountEqual(
+ search_query("&original_filename__istartswith=doc"),
+ [d4.id, d5.id],
+ )
+
def test_search_filtering_respect_owner(self):
"""
GIVEN:
--- /dev/null
+from dateutil.parser import isoparse
+from django.test import TestCase
+from whoosh import query
+
+from documents.index import DelayedQuery
+from documents.index import get_permissions_criterias
+from documents.models import User
+
+
+class TestDelayedQuery(TestCase):
+ def setUp(self):
+ super().setUp()
+ # all tests run without permission criteria, so has_no_owner query will always
+ # be appended.
+ self.has_no_owner = query.Or([query.Term("has_owner", False)])
+
+ def _get_testset__id__in(self, param, field):
+ return (
+ {f"{param}__id__in": "42,43"},
+ query.And(
+ [
+ query.Or(
+ [
+ query.Term(f"{field}_id", "42"),
+ query.Term(f"{field}_id", "43"),
+ ],
+ ),
+ self.has_no_owner,
+ ],
+ ),
+ )
+
+ def _get_testset__id__none(self, param, field):
+ return (
+ {f"{param}__id__none": "42,43"},
+ query.And(
+ [
+ query.Not(query.Term(f"{field}_id", "42")),
+ query.Not(query.Term(f"{field}_id", "43")),
+ self.has_no_owner,
+ ],
+ ),
+ )
+
+ def test_get_permission_criteria(self):
+ # tests contains touples of user instances and the expected filter
+ tests = (
+ (None, [query.Term("has_owner", False)]),
+ (User(42, username="foo", is_superuser=True), []),
+ (
+ User(42, username="foo", is_superuser=False),
+ [
+ query.Term("has_owner", False),
+ query.Term("owner_id", 42),
+ query.Term("viewer_id", "42"),
+ ],
+ ),
+ )
+ for user, expected in tests:
+ self.assertEqual(get_permissions_criterias(user), expected)
+
+ def test_no_query_filters(self):
+ dq = DelayedQuery(None, {}, None, None)
+ self.assertEqual(dq._get_query_filter(), self.has_no_owner)
+
+ def test_date_query_filters(self):
+ def _get_testset(param: str):
+ date_str = "1970-01-01T02:44"
+ date_obj = isoparse(date_str)
+ return (
+ (
+ {f"{param}__date__lt": date_str},
+ query.And(
+ [
+ query.DateRange(param, start=None, end=date_obj),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ (
+ {f"{param}__date__gt": date_str},
+ query.And(
+ [
+ query.DateRange(param, start=date_obj, end=None),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ )
+
+ query_params = ["created", "added"]
+ for param in query_params:
+ for params, expected in _get_testset(param):
+ dq = DelayedQuery(None, params, None, None)
+ got = dq._get_query_filter()
+ self.assertCountEqual(got, expected)
+
+ def test_is_tagged_query_filter(self):
+ tests = (
+ ("True", True),
+ ("true", True),
+ ("1", True),
+ ("False", False),
+ ("false", False),
+ ("0", False),
+ ("foo", False),
+ )
+ for param, expected in tests:
+ dq = DelayedQuery(None, {"is_tagged": param}, None, None)
+ self.assertEqual(
+ dq._get_query_filter(),
+ query.And([query.Term("has_tag", expected), self.has_no_owner]),
+ )
+
+ def test_tags_query_filters(self):
+ # tests contains touples of query_parameter dics and the expected whoosh query
+ param = "tags"
+ field, _ = DelayedQuery.param_map[param]
+ tests = (
+ (
+ {f"{param}__id__all": "42,43"},
+ query.And(
+ [
+ query.Term(f"{field}_id", "42"),
+ query.Term(f"{field}_id", "43"),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ # tags does not allow __id
+ (
+ {f"{param}__id": "42"},
+ self.has_no_owner,
+ ),
+ # tags does not allow __isnull
+ (
+ {f"{param}__isnull": "true"},
+ self.has_no_owner,
+ ),
+ self._get_testset__id__in(param, field),
+ self._get_testset__id__none(param, field),
+ )
+
+ for params, expected in tests:
+ dq = DelayedQuery(None, params, None, None)
+ got = dq._get_query_filter()
+ self.assertCountEqual(got, expected)
+
+ def test_generic_query_filters(self):
+ def _get_testset(param: str):
+ field, _ = DelayedQuery.param_map[param]
+ return (
+ (
+ {f"{param}__id": "42"},
+ query.And(
+ [
+ query.Term(f"{field}_id", "42"),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ self._get_testset__id__in(param, field),
+ self._get_testset__id__none(param, field),
+ (
+ {f"{param}__isnull": "true"},
+ query.And(
+ [
+ query.Term(f"has_{field}", False),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ (
+ {f"{param}__isnull": "false"},
+ query.And(
+ [
+ query.Term(f"has_{field}", True),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ )
+
+ query_params = ["correspondent", "document_type", "storage_path", "owner"]
+ for param in query_params:
+ for params, expected in _get_testset(param):
+ dq = DelayedQuery(None, params, None, None)
+ got = dq._get_query_filter()
+ self.assertCountEqual(got, expected)
+
+ def test_char_query_filter(self):
+ def _get_testset(param: str):
+ return (
+ (
+ {f"{param}__icontains": "foo"},
+ query.And(
+ [
+ query.Term(f"{param}", "foo"),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ (
+ {f"{param}__istartswith": "foo"},
+ query.And(
+ [
+ query.Prefix(f"{param}", "foo"),
+ self.has_no_owner,
+ ],
+ ),
+ ),
+ )
+
+ query_params = ["checksum", "original_filename"]
+ for param in query_params:
+ for params, expected in _get_testset(param):
+ dq = DelayedQuery(None, params, None, None)
+ got = dq._get_query_filter()
+ self.assertCountEqual(got, expected)