]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Switch src/documents/bulk*.py from os.path to pathlib.Path (#7862)
authorSebastian Steinbeißer <33968289+gothicVI@users.noreply.github.com>
Tue, 12 Nov 2024 17:04:07 +0000 (18:04 +0100)
committerGitHub <noreply@github.com>
Tue, 12 Nov 2024 17:04:07 +0000 (17:04 +0000)
Also:
* Ensure that the ruff PTH check remains enabled for these files and
all files added in the future.
* Add some type annotations.

.ruff.toml
src/documents/bulk_download.py
src/documents/bulk_edit.py

index 2ac983fe83767835e3ce543e7a5997c2b63e11d9..d805f3add3452468b3b0668d8755adbe0f593fe8 100644 (file)
@@ -31,17 +31,61 @@ extend-select = [
   "PLE",   # https://docs.astral.sh/ruff/rules/#pylint-pl
   "RUF",   # https://docs.astral.sh/ruff/rules/#ruff-specific-rules-ruf
   "FLY",   # https://docs.astral.sh/ruff/rules/#flynt-fly
+  "PTH",   # https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
 ]
-# TODO PTH https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
 ignore = ["DJ001", "SIM105", "RUF012"]
 
 [lint.per-file-ignores]
 ".github/scripts/*.py" = ["E501", "INP001", "SIM117"]
 "docker/wait-for-redis.py" = ["INP001", "T201"]
+"src/documents/barcodes.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/classifier.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/consumer.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/file_handling.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/index.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/management/commands/decrypt_documents.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/management/commands/document_consumer.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/management/commands/document_exporter.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/management/commands/document_importer.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/migrations/0012_auto_20160305_0040.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/migrations/0014_document_checksum.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/migrations/1003_mime_types.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/migrations/1012_fix_archive_files.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/models.py" = ["SIM115", "PTH"]  # TODO PTH Enable & remove
+"src/documents/parsers.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/signals/handlers.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tasks.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_api_app_config.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_api_bulk_download.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_api_documents.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_classifier.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_consumer.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_file_handling.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_management.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_management_consumer.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_management_exporter.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_management_thumbnails.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_migration_archive_files.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_migration_document_pages_count.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_migration_mime_type.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_sanity_check.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_tasks.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/tests/test_views.py" = ["PTH"]  # TODO Enable & remove
+"src/documents/views.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless/checks.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless/settings.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless/tests/test_checks.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless/urls.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless/views.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless_mail/mail.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless_mail/preprocessor.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless_tesseract/parsers.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless_tesseract/tests/test_parser.py" = ["RUF001", "PTH"]  # TODO PTH Enable & remove
+"src/paperless_tika/tests/test_live_tika.py" = ["PTH"]  # TODO Enable & remove
+"src/paperless_tika/tests/test_tika_parser.py" = ["PTH"]  # TODO Enable & remove
 "*/tests/*.py" = ["E501", "SIM117"]
 "*/migrations/*.py" = ["E501", "SIM", "T201"]
-"src/paperless_tesseract/tests/test_parser.py" = ["RUF001"]
-"src/documents/models.py" = ["SIM115"]
 
 [lint.isort]
 force-single-line = true
index ecabd4515cf7a677c486a817ce6cbab3e5d6ac61..25dfb5a14f8ea85d8c1bf452364ad2e3809793b7 100644 (file)
@@ -1,14 +1,21 @@
-import os
+from pathlib import Path
+from typing import TYPE_CHECKING
+from typing import NoReturn
 from zipfile import ZipFile
 
 from documents.models import Document
 
+if TYPE_CHECKING:
+    from collections.abc import Callable
+
 
 class BulkArchiveStrategy:
-    def __init__(self, zipf: ZipFile, follow_formatting: bool = False):
-        self.zipf = zipf
+    def __init__(self, zipf: ZipFile, follow_formatting: bool = False) -> None:
+        self.zipf: ZipFile = zipf
         if follow_formatting:
-            self.make_unique_filename = self._formatted_filepath
+            self.make_unique_filename: Callable[..., Path | str] = (
+                self._formatted_filepath
+            )
         else:
             self.make_unique_filename = self._filename_only
 
@@ -17,7 +24,7 @@ class BulkArchiveStrategy:
         doc: Document,
         archive: bool = False,
         folder: str = "",
-    ):
+    ) -> str:
         """
         Constructs a unique name for the given document to be used inside the
         zip file.
@@ -26,7 +33,7 @@ class BulkArchiveStrategy:
         """
         counter = 0
         while True:
-            filename = folder + doc.get_public_filename(archive, counter)
+            filename: str = folder + doc.get_public_filename(archive, counter)
             if filename in self.zipf.namelist():
                 counter += 1
             else:
@@ -37,7 +44,7 @@ class BulkArchiveStrategy:
         doc: Document,
         archive: bool = False,
         folder: str = "",
-    ):
+    ) -> Path:
         """
         Constructs a full file path for the given document to be used inside
         the zipfile.
@@ -45,24 +52,30 @@ class BulkArchiveStrategy:
         The path is already unique, as handled when a document is consumed or updated
         """
         if archive and doc.has_archive_version:
-            in_archive_path = os.path.join(folder, doc.archive_filename)
+            if TYPE_CHECKING:
+                assert doc.archive_filename is not None
+            in_archive_path: Path = Path(folder) / doc.archive_filename
         else:
-            in_archive_path = os.path.join(folder, doc.filename)
+            if TYPE_CHECKING:
+                assert doc.filename is not None
+            in_archive_path = Path(folder) / doc.filename
 
         return in_archive_path
 
-    def add_document(self, doc: Document):
+    def add_document(self, doc: Document) -> NoReturn:
         raise NotImplementedError  # pragma: no cover
 
 
 class OriginalsOnlyStrategy(BulkArchiveStrategy):
-    def add_document(self, doc: Document):
+    def add_document(self, doc: Document) -> None:
         self.zipf.write(doc.source_path, self.make_unique_filename(doc))
 
 
 class ArchiveOnlyStrategy(BulkArchiveStrategy):
-    def add_document(self, doc: Document):
+    def add_document(self, doc: Document) -> None:
         if doc.has_archive_version:
+            if TYPE_CHECKING:
+                assert doc.archive_path is not None
             self.zipf.write(
                 doc.archive_path,
                 self.make_unique_filename(doc, archive=True),
@@ -72,8 +85,10 @@ class ArchiveOnlyStrategy(BulkArchiveStrategy):
 
 
 class OriginalAndArchiveStrategy(BulkArchiveStrategy):
-    def add_document(self, doc: Document):
+    def add_document(self, doc: Document) -> None:
         if doc.has_archive_version:
+            if TYPE_CHECKING:
+                assert doc.archive_path is not None
             self.zipf.write(
                 doc.archive_path,
                 self.make_unique_filename(doc, archive=True, folder="archive/"),
index 2e3e5f591e6b3ebde754f5b535edd10523ec7469..4d03769c8e924575136f76e0b516aef970ed467f 100644 (file)
@@ -1,8 +1,9 @@
 import hashlib
 import itertools
 import logging
-import os
 import tempfile
+from pathlib import Path
+from typing import Literal
 
 from celery import chain
 from celery import chord
@@ -25,10 +26,13 @@ from documents.tasks import bulk_update_documents
 from documents.tasks import consume_file
 from documents.tasks import update_document_archive_file
 
-logger = logging.getLogger("paperless.bulk_edit")
+logger: logging.Logger = logging.getLogger("paperless.bulk_edit")
 
 
-def set_correspondent(doc_ids: list[int], correspondent):
+def set_correspondent(
+    doc_ids: list[int],
+    correspondent: Correspondent,
+) -> Literal["OK"]:
     if correspondent:
         correspondent = Correspondent.objects.only("pk").get(id=correspondent)
 
@@ -45,7 +49,7 @@ def set_correspondent(doc_ids: list[int], correspondent):
     return "OK"
 
 
-def set_storage_path(doc_ids: list[int], storage_path):
+def set_storage_path(doc_ids: list[int], storage_path: StoragePath) -> Literal["OK"]:
     if storage_path:
         storage_path = StoragePath.objects.only("pk").get(id=storage_path)
 
@@ -66,7 +70,7 @@ def set_storage_path(doc_ids: list[int], storage_path):
     return "OK"
 
 
-def set_document_type(doc_ids: list[int], document_type):
+def set_document_type(doc_ids: list[int], document_type: DocumentType) -> Literal["OK"]:
     if document_type:
         document_type = DocumentType.objects.only("pk").get(id=document_type)
 
@@ -83,7 +87,7 @@ def set_document_type(doc_ids: list[int], document_type):
     return "OK"
 
 
-def add_tag(doc_ids: list[int], tag: int):
+def add_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
     qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(tags__id=tag)).only("pk")
     affected_docs = list(qs.values_list("pk", flat=True))
 
@@ -98,7 +102,7 @@ def add_tag(doc_ids: list[int], tag: int):
     return "OK"
 
 
-def remove_tag(doc_ids: list[int], tag: int):
+def remove_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
     qs = Document.objects.filter(Q(id__in=doc_ids) & Q(tags__id=tag)).only("pk")
     affected_docs = list(qs.values_list("pk", flat=True))
 
@@ -113,7 +117,11 @@ def remove_tag(doc_ids: list[int], tag: int):
     return "OK"
 
 
-def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int]):
+def modify_tags(
+    doc_ids: list[int],
+    add_tags: list[int],
+    remove_tags: list[int],
+) -> Literal["OK"]:
     qs = Document.objects.filter(id__in=doc_ids).only("pk")
     affected_docs = list(qs.values_list("pk", flat=True))
 
@@ -137,7 +145,11 @@ def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int])
     return "OK"
 
 
-def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fields):
+def modify_custom_fields(
+    doc_ids: list[int],
+    add_custom_fields,
+    remove_custom_fields,
+) -> Literal["OK"]:
     qs = Document.objects.filter(id__in=doc_ids).only("pk")
     affected_docs = list(qs.values_list("pk", flat=True))
 
@@ -158,7 +170,7 @@ def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fi
 
 
 @shared_task
-def delete(doc_ids: list[int]):
+def delete(doc_ids: list[int]) -> Literal["OK"]:
     try:
         Document.objects.filter(id__in=doc_ids).delete()
 
@@ -177,7 +189,7 @@ def delete(doc_ids: list[int]):
     return "OK"
 
 
-def reprocess(doc_ids: list[int]):
+def reprocess(doc_ids: list[int]) -> Literal["OK"]:
     for document_id in doc_ids:
         update_document_archive_file.delay(
             document_id=document_id,
@@ -186,7 +198,12 @@ def reprocess(doc_ids: list[int]):
     return "OK"
 
 
-def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False):
+def set_permissions(
+    doc_ids: list[int],
+    set_permissions,
+    owner=None,
+    merge=False,
+) -> Literal["OK"]:
     qs = Document.objects.filter(id__in=doc_ids).select_related("owner")
 
     if merge:
@@ -205,12 +222,12 @@ def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False
     return "OK"
 
 
-def rotate(doc_ids: list[int], degrees: int):
+def rotate(doc_ids: list[int], degrees: int) -> Literal["OK"]:
     logger.info(
         f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
     )
     qs = Document.objects.filter(id__in=doc_ids)
-    affected_docs = []
+    affected_docs: list[int] = []
     import pikepdf
 
     rotate_tasks = []
@@ -250,17 +267,17 @@ def merge(
     doc_ids: list[int],
     metadata_document_id: int | None = None,
     delete_originals: bool = False,
-    user: User = None,
-):
+    user: User | None = None,
+) -> Literal["OK"]:
     logger.info(
         f"Attempting to merge {len(doc_ids)} documents into a single document.",
     )
     qs = Document.objects.filter(id__in=doc_ids)
-    affected_docs = []
+    affected_docs: list[int] = []
     import pikepdf
 
     merged_pdf = pikepdf.new()
-    version = merged_pdf.pdf_version
+    version: str = merged_pdf.pdf_version
     # use doc_ids to preserve order
     for doc_id in doc_ids:
         doc = qs.get(id=doc_id)
@@ -277,9 +294,11 @@ def merge(
         logger.warning("No documents were merged")
         return "OK"
 
-    filepath = os.path.join(
-        tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
-        f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf",
+    filepath = (
+        Path(
+            tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
+        )
+        / f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
     )
     merged_pdf.remove_unreferenced_resources()
     merged_pdf.save(filepath, min_version=version)
@@ -288,8 +307,12 @@ def merge(
     if metadata_document_id:
         metadata_document = qs.get(id=metadata_document_id)
         if metadata_document is not None:
-            overrides = DocumentMetadataOverrides.from_document(metadata_document)
+            overrides: DocumentMetadataOverrides = (
+                DocumentMetadataOverrides.from_document(metadata_document)
+            )
             overrides.title = metadata_document.title + " (merged)"
+        else:
+            overrides = DocumentMetadataOverrides()
     else:
         overrides = DocumentMetadataOverrides()
 
@@ -321,8 +344,8 @@ def split(
     doc_ids: list[int],
     pages: list[list[int]],
     delete_originals: bool = False,
-    user: User = None,
-):
+    user: User | None = None,
+) -> Literal["OK"]:
     logger.info(
         f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
     )
@@ -334,18 +357,22 @@ def split(
     try:
         with pikepdf.open(doc.source_path) as pdf:
             for idx, split_doc in enumerate(pages):
-                dst = pikepdf.new()
+                dst: pikepdf.Pdf = pikepdf.new()
                 for page in split_doc:
                     dst.pages.append(pdf.pages[page - 1])
-                filepath = os.path.join(
-                    tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
-                    f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf",
+                filepath: Path = (
+                    Path(
+                        tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
+                    )
+                    / f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf"
                 )
                 dst.remove_unreferenced_resources()
                 dst.save(filepath)
                 dst.close()
 
-                overrides = DocumentMetadataOverrides().from_document(doc)
+                overrides: DocumentMetadataOverrides = (
+                    DocumentMetadataOverrides().from_document(doc)
+                )
                 overrides.title = f"{doc.title} (split {idx + 1})"
                 if user is not None:
                     overrides.owner_id = user.id
@@ -376,7 +403,7 @@ def split(
     return "OK"
 
 
-def delete_pages(doc_ids: list[int], pages: list[int]):
+def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
     logger.info(
         f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
     )