break
if tag:
- tag = Tag.objects.get_or_create(
+ tag, _ = Tag.objects.get_or_create(
name__iexact=tag,
defaults={"name": tag},
- )[0]
+ )
logger.debug(
f"Found Tag Barcode '{raw}', substituted "
logger = logging.getLogger("paperless.bulk_edit")
-def set_correspondent(doc_ids, correspondent):
+def set_correspondent(doc_ids: list[int], correspondent):
+
if correspondent:
- correspondent = Correspondent.objects.get(id=correspondent)
+ correspondent = Correspondent.objects.only("pk").get(id=correspondent)
- qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(correspondent=correspondent))
- affected_docs = [doc.id for doc in qs]
+ qs = (
+ Document.objects.filter(Q(id__in=doc_ids) & ~Q(correspondent=correspondent))
+ .select_related("correspondent")
+ .only("pk", "correspondent__id")
+ )
+ affected_docs = list(qs.values_list("pk", flat=True))
qs.update(correspondent=correspondent)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
-def set_storage_path(doc_ids, storage_path):
+def set_storage_path(doc_ids: list[int], storage_path):
if storage_path:
- storage_path = StoragePath.objects.get(id=storage_path)
+ storage_path = StoragePath.objects.only("pk").get(id=storage_path)
- qs = Document.objects.filter(
- Q(id__in=doc_ids) & ~Q(storage_path=storage_path),
+ qs = (
+ Document.objects.filter(
+ Q(id__in=doc_ids) & ~Q(storage_path=storage_path),
+ )
+ .select_related("storage_path")
+ .only("pk", "storage_path__id")
)
- affected_docs = [doc.id for doc in qs]
+ affected_docs = list(qs.values_list("pk", flat=True))
qs.update(storage_path=storage_path)
bulk_update_documents.delay(
return "OK"
-def set_document_type(doc_ids, document_type):
+def set_document_type(doc_ids: list[int], document_type):
if document_type:
- document_type = DocumentType.objects.get(id=document_type)
+ document_type = DocumentType.objects.only("pk").get(id=document_type)
- qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(document_type=document_type))
- affected_docs = [doc.id for doc in qs]
+ qs = (
+ Document.objects.filter(Q(id__in=doc_ids) & ~Q(document_type=document_type))
+ .select_related("document_type")
+ .only("pk", "document_type__id")
+ )
+ affected_docs = list(qs.values_list("pk", flat=True))
qs.update(document_type=document_type)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
-def add_tag(doc_ids, tag):
- qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(tags__id=tag))
- affected_docs = [doc.id for doc in qs]
+def add_tag(doc_ids: list[int], tag: int):
+
+ qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(tags__id=tag)).only("pk")
+ affected_docs = list(qs.values_list("pk", flat=True))
DocumentTagRelationship = Document.tags.through
return "OK"
-def remove_tag(doc_ids, tag):
- qs = Document.objects.filter(Q(id__in=doc_ids) & Q(tags__id=tag))
- affected_docs = [doc.id for doc in qs]
+def remove_tag(doc_ids: list[int], tag: int):
+
+ qs = Document.objects.filter(Q(id__in=doc_ids) & Q(tags__id=tag)).only("pk")
+ affected_docs = list(qs.values_list("pk", flat=True))
DocumentTagRelationship = Document.tags.through
return "OK"
-def modify_tags(doc_ids, add_tags, remove_tags):
- qs = Document.objects.filter(id__in=doc_ids)
- affected_docs = [doc.id for doc in qs]
+def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int]):
+ qs = Document.objects.filter(id__in=doc_ids).only("pk")
+ affected_docs = list(qs.values_list("pk", flat=True))
DocumentTagRelationship = Document.tags.through
return "OK"
-def modify_custom_fields(doc_ids, add_custom_fields, remove_custom_fields):
- qs = Document.objects.filter(id__in=doc_ids)
- affected_docs = [doc.id for doc in qs]
+def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fields):
+ qs = Document.objects.filter(id__in=doc_ids).only("pk")
+ affected_docs = list(qs.values_list("pk", flat=True))
fields_to_add = []
for field in add_custom_fields:
return "OK"
-def delete(doc_ids):
+def delete(doc_ids: list[int]):
Document.objects.filter(id__in=doc_ids).delete()
from documents import index
return "OK"
-def redo_ocr(doc_ids):
+def redo_ocr(doc_ids: list[int]):
for document_id in doc_ids:
update_document_archive_file.delay(
document_id=document_id,
return "OK"
-def set_permissions(doc_ids, set_permissions, owner=None, merge=False):
- qs = Document.objects.filter(id__in=doc_ids)
+def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False):
+ qs = Document.objects.filter(id__in=doc_ids).select_related("owner")
if merge:
# If merging, only set owner for documents that don't have an owner
for doc in qs:
set_permissions_for_object(permissions=set_permissions, object=doc, merge=merge)
- affected_docs = [doc.id for doc in qs]
+ affected_docs = list(qs.values_list("pk", flat=True))
bulk_update_documents.delay(document_ids=affected_docs)
# The metadata exists in the cache
if doc_metadata is not None:
try:
- doc = Document.objects.get(pk=document_id)
+ doc = Document.objects.only(
+ "pk",
+ "checksum",
+ "archive_checksum",
+ "archive_filename",
+ ).get(pk=document_id)
# The original checksums match
# If it has one, the archive checksums match
# Then, we can use the metadata
from paperless.db import GnuPG
try:
- encrypted_doc = Document.objects.filter(
- storage_type=Document.STORAGE_TYPE_GPG,
- ).first()
+ encrypted_doc = (
+ Document.objects.filter(
+ storage_type=Document.STORAGE_TYPE_GPG,
+ )
+ .only("pk", "storage_type")
+ .first()
+ )
except (OperationalError, ProgrammingError, FieldError):
return [] # No documents table yet
def train(self):
# Get non-inbox documents
- docs_queryset = Document.objects.exclude(
- tags__is_inbox_tag=True,
+ docs_queryset = (
+ Document.objects.exclude(
+ tags__is_inbox_tag=True,
+ )
+ .select_related("document_type", "correspondent", "storage_path")
+ .prefetch_related("tags")
)
# No documents exit to train against
ETag
"""
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.only("checksum").get(pk=pk)
return doc.checksum
except Document.DoesNotExist: # pragma: no cover
return None
error on the side of more cautious
"""
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.only("modified").get(pk=pk)
return doc.modified
except Document.DoesNotExist: # pragma: no cover
return None
ETag for the document preview, using the original or archive checksum, depending on the request
"""
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.only("checksum", "archive_checksum").get(pk=pk)
use_original = (
"original" in request.query_params
and request.query_params["original"] == "true"
speaking correct, but close enough and quick
"""
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.only("modified").get(pk=pk)
return doc.modified
except Document.DoesNotExist: # pragma: no cover
return None
Cache should be (slightly?) faster than filesystem
"""
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.only("storage_type").get(pk=pk)
if not doc.thumbnail_path.exists():
return None
doc_key = get_thumbnail_modified_key(pk)
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
+from django.db.models import QuerySet
from guardian.core import ObjectPermissionChecker
from guardian.models import GroupObjectPermission
from guardian.shortcuts import assign_perm
)
-def get_objects_for_user_owner_aware(user, perms, Model):
+def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:
objects_owned = Model.objects.filter(owner=user)
objects_unowned = Model.objects.filter(owner__isnull=True)
objects_with_perms = get_objects_for_user(
class SanityCheckMessages:
def __init__(self):
- self._messages = defaultdict(list)
+ self._messages: dict[int, list[dict]] = defaultdict(list)
self.has_error = False
self.has_warning = False
self.cf1 = CustomField.objects.create(name="cf1", data_type="text")
self.cf2 = CustomField.objects.create(name="cf2", data_type="text")
- @mock.patch("documents.serialisers.bulk_edit.set_correspondent")
- def test_api_set_correspondent(self, m):
- m.return_value = "OK"
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_set_correspondent(self, bulk_update_task_mock):
+ self.assertNotEqual(self.doc1.correspondent, self.c1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertEqual(kwargs["correspondent"], self.c1.id)
+ self.doc1.refresh_from_db()
+ self.assertEqual(self.doc1.correspondent, self.c1)
+ bulk_update_task_mock.assert_called_once_with(document_ids=[self.doc1.pk])
+
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_unset_correspondent(self, bulk_update_task_mock):
+ self.doc1.correspondent = self.c1
+ self.doc1.save()
+ self.assertIsNotNone(self.doc1.correspondent)
- @mock.patch("documents.serialisers.bulk_edit.set_correspondent")
- def test_api_unset_correspondent(self, m):
- m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertIsNone(kwargs["correspondent"])
+ bulk_update_task_mock.assert_called_once()
+ self.doc1.refresh_from_db()
+ self.assertIsNone(self.doc1.correspondent)
- @mock.patch("documents.serialisers.bulk_edit.set_document_type")
- def test_api_set_type(self, m):
- m.return_value = "OK"
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_set_type(self, bulk_update_task_mock):
+ self.assertNotEqual(self.doc1.document_type, self.dt1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertEqual(kwargs["document_type"], self.dt1.id)
+ self.doc1.refresh_from_db()
+ self.assertEqual(self.doc1.document_type, self.dt1)
+ bulk_update_task_mock.assert_called_once_with(document_ids=[self.doc1.pk])
+
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_unset_type(self, bulk_update_task_mock):
+ self.doc1.document_type = self.dt1
+ self.doc1.save()
- @mock.patch("documents.serialisers.bulk_edit.set_document_type")
- def test_api_unset_type(self, m):
- m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertIsNone(kwargs["document_type"])
+ self.doc1.refresh_from_db()
+ self.assertIsNone(self.doc1.document_type)
+ bulk_update_task_mock.assert_called_once_with(document_ids=[self.doc1.pk])
+
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_add_tag(self, bulk_update_task_mock):
+
+ self.assertFalse(self.doc1.tags.filter(pk=self.t1.pk).exists())
- @mock.patch("documents.serialisers.bulk_edit.add_tag")
- def test_api_add_tag(self, m):
- m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertEqual(kwargs["tag"], self.t1.id)
+ self.doc1.refresh_from_db()
+
+ self.assertTrue(self.doc1.tags.filter(pk=self.t1.pk).exists())
+
+ bulk_update_task_mock.assert_called_once_with(document_ids=[self.doc1.pk])
+
+ @mock.patch("documents.bulk_edit.bulk_update_documents.delay")
+ def test_api_remove_tag(self, bulk_update_task_mock):
+ self.doc1.tags.add(self.t1)
- @mock.patch("documents.serialisers.bulk_edit.remove_tag")
- def test_api_remove_tag(self, m):
- m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
- m.assert_called_once()
- args, kwargs = m.call_args
- self.assertEqual(args[0], [self.doc1.id])
- self.assertEqual(kwargs["tag"], self.t1.id)
+ self.doc1.refresh_from_db()
+ self.assertFalse(self.doc1.tags.filter(pk=self.t1.pk).exists())
@mock.patch("documents.serialisers.bulk_edit.modify_tags")
def test_api_modify_tags(self, m):
model = Correspondent
queryset = (
- Correspondent.objects.annotate(
+ Correspondent.objects.prefetch_related("documents")
+ .annotate(
last_correspondence=Max("documents__created"),
)
.select_related("owner")
)
def file_response(self, pk, request, disposition):
- doc = Document.objects.get(id=pk)
+ doc = Document.objects.select_related("owner").get(id=pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
)
def metadata(self, request, pk=None):
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.select_related("owner").get(pk=pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
),
)
def suggestions(self, request, pk=None):
- doc = get_object_or_404(Document, pk=pk)
+ doc = get_object_or_404(Document.objects.select_related("owner"), pk=pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@method_decorator(last_modified(thumbnail_last_modified))
def thumb(self, request, pk=None):
try:
- doc = Document.objects.get(id=pk)
+ doc = Document.objects.select_related("owner").get(id=pk)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
def getNotes(self, doc):
return [
{
- "id": c.id,
+ "id": c.pk,
"note": c.note,
"created": c.created,
"user": {
"last_name": c.user.last_name,
},
}
- for c in Note.objects.filter(document=doc).order_by("-created")
+ for c in Note.objects.select_related("user")
+ .only(
+ "pk",
+ "note",
+ "created",
+ "user__id",
+ "user__username",
+ "user__first_name",
+ "user__last_name",
+ )
+ .filter(document=doc)
+ .order_by("-created")
]
@action(methods=["get", "post", "delete"], detail=True)
def notes(self, request, pk=None):
+
currentUser = request.user
try:
- doc = Document.objects.get(pk=pk)
+ doc = (
+ Document.objects.select_related("owner")
+ .prefetch_related("notes")
+ .only("pk", "owner__id")
+ .get(pk=pk)
+ )
if currentUser is not None and not has_perms_owner_aware(
currentUser,
"view_document",
if request.method == "GET":
try:
- return Response(self.getNotes(doc))
+ notes = self.getNotes(doc)
+ return Response(notes)
except Exception as e:
logger.warning(f"An error occurred retrieving notes: {e!s}")
return Response(
note=request.data["note"],
user=currentUser,
)
- c.save()
# If audit log is enabled make an entry in the log
# about this note change
if settings.AUDIT_LOG_ENABLED:
from documents import index
- index.add_or_update_document(self.get_object())
+ index.add_or_update_document(doc)
+
+ notes = self.getNotes(doc)
- return Response(self.getNotes(doc))
+ return Response(notes)
except Exception as e:
logger.warning(f"An error occurred saving note: {e!s}")
return Response(
def share_links(self, request, pk=None):
currentUser = request.user
try:
- doc = Document.objects.get(pk=pk)
+ doc = Document.objects.select_related("owner").get(pk=pk)
if currentUser is not None and not has_perms_owner_aware(
currentUser,
"change_document",
now = timezone.now()
links = [
{
- "id": c.id,
+ "id": c.pk,
"created": c.created,
"expiration": c.expiration,
"slug": c.slug,
}
for c in ShareLink.objects.filter(document=doc)
+ .only("pk", "created", "expiration", "slug")
.exclude(expiration__lt=now)
.order_by("-created")
]
documents = serializer.validated_data.get("documents")
if not user.is_superuser:
- document_objs = Document.objects.filter(pk__in=documents)
+ document_objs = Document.objects.select_related("owner").filter(
+ pk__in=documents,
+ )
has_perms = (
all((doc.owner == user or doc.owner is None) for doc in document_objs)
if method
permission_classes = (IsAuthenticated,)
def get(self, request, format=None):
+
user = request.user if request.user is not None else None
documents = (
- Document.objects.all()
- if user is None
- else get_objects_for_user_owner_aware(
- user,
- "documents.view_document",
- Document,
+ (
+ Document.objects.all()
+ if user is None
+ else get_objects_for_user_owner_aware(
+ user,
+ "documents.view_document",
+ Document,
+ )
)
+ .only("mime_type", "content")
+ .prefetch_related("tags")
)
tags = (
Tag.objects.all()
correspondent_count = (
Correspondent.objects.count()
if user is None
- else len(
- get_objects_for_user_owner_aware(
- user,
- "documents.view_correspondent",
- Correspondent,
- ),
- )
+ else get_objects_for_user_owner_aware(
+ user,
+ "documents.view_correspondent",
+ Correspondent,
+ ).count()
)
document_type_count = (
DocumentType.objects.count()
if user is None
- else len(
- get_objects_for_user_owner_aware(
- user,
- "documents.view_documenttype",
- DocumentType,
- ),
- )
+ else get_objects_for_user_owner_aware(
+ user,
+ "documents.view_documenttype",
+ DocumentType,
+ ).count()
)
storage_path_count = (
StoragePath.objects.count()
if user is None
- else len(
- get_objects_for_user_owner_aware(
- user,
- "documents.view_storagepath",
- StoragePath,
- ),
- )
+ else get_objects_for_user_owner_aware(
+ user,
+ "documents.view_storagepath",
+ StoragePath,
+ ).count()
)
documents_total = documents.count()
with zipfile.ZipFile(temp.name, "w", compression) as zipf:
strategy = strategy_class(zipf, follow_filename_format)
- for id in ids:
- doc = Document.objects.get(id=id)
- strategy.add_document(doc)
+ for document in Document.objects.filter(pk__in=ids):
+ strategy.add_document(document)
with open(temp.name, "rb") as f:
response = HttpResponse(f, content_type="application/zip")
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
- user = User.objects.get(pk=request.user.id)
+ user = User.objects.select_related("ui_settings").get(pk=request.user.id)
ui_settings = {}
if hasattr(user, "ui_settings"):
ui_settings = user.ui_settings.settings
object_class = serializer.get_object_class(object_type)
operation = serializer.validated_data.get("operation")
- objs = object_class.objects.filter(pk__in=object_ids)
+ objs = object_class.objects.select_related("owner").filter(pk__in=object_ids)
if not user.is_superuser:
model_name = object_class._meta.verbose_name