from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
+from django.db.models import Count
+from django.db.models import IntegerField
+from django.db.models import OuterRef
from django.db.models import Q
from django.db.models import QuerySet
+from django.db.models import Subquery
+from django.db.models.functions import Cast
+from django.db.models.functions import Coalesce
from guardian.core import ObjectPermissionChecker
from guardian.models import GroupObjectPermission
+from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_objects_for_user
from guardian.shortcuts import get_users_with_perms
)
+def _permitted_document_ids(user):
+ """
+ Return a queryset of document IDs the user may view, limited to non-deleted
+ documents. This intentionally avoids ``get_objects_for_user`` to keep the
+ subquery small and index-friendly.
+ """
+
+ base_docs = Document.objects.filter(deleted_at__isnull=True).only("id", "owner")
+
+ if user is None or not getattr(user, "is_authenticated", False):
+ # Just Anonymous user e.g. for drf-spectacular
+ return base_docs.filter(owner__isnull=True).values_list("id", flat=True)
+
+ if getattr(user, "is_superuser", False):
+ return base_docs.values_list("id", flat=True)
+
+ document_ct = ContentType.objects.get_for_model(Document)
+ perm_filter = {
+ "permission__codename": "view_document",
+ "permission__content_type": document_ct,
+ }
+
+ user_perm_docs = (
+ UserObjectPermission.objects.filter(user=user, **perm_filter)
+ .annotate(object_pk_int=Cast("object_pk", IntegerField()))
+ .values_list("object_pk_int", flat=True)
+ )
+
+ group_perm_docs = (
+ GroupObjectPermission.objects.filter(group__user=user, **perm_filter)
+ .annotate(object_pk_int=Cast("object_pk", IntegerField()))
+ .values_list("object_pk_int", flat=True)
+ )
+
+ permitted_documents = user_perm_docs.union(group_perm_docs)
+
+ return base_docs.filter(
+ Q(owner=user) | Q(owner__isnull=True) | Q(id__in=permitted_documents),
+ ).values_list("id", flat=True)
+
+
def get_document_count_filter_for_user(user):
"""
Return the Q object used to filter document counts for the given user.
+
+ The filter is expressed as an ``id__in`` against a small subquery of permitted
+ document IDs to keep the generated SQL simple and avoid large OR clauses.
"""
- if user is None or not getattr(user, "is_authenticated", False):
- return Q(documents__deleted_at__isnull=True, documents__owner__isnull=True)
if getattr(user, "is_superuser", False):
+ # Superuser: no permission filtering needed
return Q(documents__deleted_at__isnull=True)
- return Q(
- documents__deleted_at__isnull=True,
- documents__id__in=get_objects_for_user_owner_aware(
- user,
- "documents.view_document",
- Document,
- ).values_list("id", flat=True),
+
+ permitted_ids = _permitted_document_ids(user)
+ return Q(documents__id__in=permitted_ids)
+
+
+def annotate_document_count_for_related_queryset(
+ queryset,
+ through_model,
+ related_object_field: str,
+ target_field: str = "document_id",
+ user=None,
+):
+ """
+ Annotate a queryset with permissions-aware document counts using a subquery
+ against a relation table.
+
+ Args:
+ queryset: base queryset to annotate (must contain pk)
+ through_model: model representing the relation (e.g., Document.tags.through
+ or CustomFieldInstance)
+ source_field: field on the relation pointing back to queryset pk
+ target_field: field on the relation pointing to Document id
+ user: the user for whom to filter permitted document ids
+ """
+
+ permitted_ids = _permitted_document_ids(user)
+ counts = (
+ through_model.objects.filter(
+ **{
+ related_object_field: OuterRef("pk"),
+ f"{target_field}__in": permitted_ids,
+ },
+ )
+ .values(related_object_field)
+ .annotate(c=Count(target_field))
+ .values("c")
)
+ return queryset.annotate(document_count=Coalesce(Subquery(counts[:1]), 0))
def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:
from django.db.models import IntegerField
from django.db.models import Max
from django.db.models import Model
-from django.db.models import Q
from django.db.models import Sum
from django.db.models import When
from django.db.models.functions import Length
from documents.matching import match_tags
from documents.models import Correspondent
from documents.models import CustomField
+from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import Note
from documents.permissions import PaperlessNotePermissions
from documents.permissions import PaperlessObjectPermissions
from documents.permissions import ViewDocumentsPermissions
+from documents.permissions import annotate_document_count_for_related_queryset
from documents.permissions import get_document_count_filter_for_user
from documents.permissions import get_objects_for_user_owner_aware
from documents.permissions import has_perms_owner_aware
Mixin to add document count to queryset, permissions-aware if needed
"""
+ # Default is simple relation path, override for through-table/count specialization.
+ document_count_through = None
+ document_count_source_field = None
+
def get_document_count_filter(self):
request = getattr(self, "request", None)
user = getattr(request, "user", None) if request else None
return get_document_count_filter_for_user(user)
def get_queryset(self):
+ base_qs = super().get_queryset()
+
+ # Use optimized through-table counting when configured.
+ if self.document_count_through:
+ user = getattr(getattr(self, "request", None), "user", None)
+ return annotate_document_count_for_related_queryset(
+ base_qs,
+ through_model=self.document_count_through,
+ related_object_field=self.document_count_source_field,
+ user=user,
+ )
+
+ # Fallback: simple Count on relation with permission filter.
filter = self.get_document_count_filter()
- return (
- super()
- .get_queryset()
- .annotate(document_count=Count("documents", filter=filter))
+ return base_qs.annotate(
+ document_count=Count("documents", filter=filter),
)
@extend_schema_view(**generate_object_with_permissions_schema(CorrespondentSerializer))
-class CorrespondentViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
+class CorrespondentViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = Correspondent
queryset = Correspondent.objects.select_related("owner").order_by(Lower("name"))
@extend_schema_view(**generate_object_with_permissions_schema(TagSerializer))
-class TagViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
+class TagViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = Tag
+ document_count_through = Document.tags.through
+ document_count_source_field = "tag_id"
queryset = Tag.objects.select_related("owner").order_by(
Lower("name"),
descendant_pks = {pk for tag in all_tags for pk in tag.get_descendants_pks()}
if descendant_pks:
- filter_q = self.get_document_count_filter()
+ user = getattr(getattr(self, "request", None), "user", None)
children_source = list(
- Tag.objects.filter(pk__in=descendant_pks | {t.pk for t in all_tags})
- .select_related("owner")
- .annotate(document_count=Count("documents", filter=filter_q))
- .order_by(*ordering),
+ annotate_document_count_for_related_queryset(
+ Tag.objects.filter(pk__in=descendant_pks | {t.pk for t in all_tags})
+ .select_related("owner")
+ .order_by(*ordering),
+ through_model=self.document_count_through,
+ related_object_field=self.document_count_source_field,
+ user=user,
+ ),
)
else:
children_source = all_tags
@extend_schema_view(**generate_object_with_permissions_schema(DocumentTypeSerializer))
-class DocumentTypeViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
+class DocumentTypeViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = DocumentType
queryset = DocumentType.objects.select_related("owner").order_by(Lower("name"))
@extend_schema_view(**generate_object_with_permissions_schema(StoragePathSerializer))
-class StoragePathViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin):
+class StoragePathViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
model = StoragePath
queryset = StoragePath.objects.select_related("owner").order_by(
)
-class CustomFieldViewSet(ModelViewSet):
+class CustomFieldViewSet(PermissionsAwareDocumentCountMixin, ModelViewSet):
permission_classes = (IsAuthenticated, PaperlessObjectPermissions)
serializer_class = CustomFieldSerializer
filterset_class = CustomFieldFilterSet
model = CustomField
+ document_count_through = CustomFieldInstance
+ document_count_source_field = "field_id"
queryset = CustomField.objects.all().order_by("-created")
- def get_queryset(self):
- filter = (
- Q(fields__document__deleted_at__isnull=True)
- if self.request.user is None or self.request.user.is_superuser
- else (
- Q(
- fields__document__deleted_at__isnull=True,
- fields__document__id__in=get_objects_for_user_owner_aware(
- self.request.user,
- "documents.view_document",
- Document,
- ).values_list("id", flat=True),
- )
- )
- )
- return (
- super()
- .get_queryset()
- .annotate(
- document_count=Count(
- "fields",
- filter=filter,
- ),
- )
- )
-
@extend_schema_view(
get=extend_schema(