"PLE", # https://docs.astral.sh/ruff/rules/#pylint-pl
"RUF", # https://docs.astral.sh/ruff/rules/#ruff-specific-rules-ruf
"FLY", # https://docs.astral.sh/ruff/rules/#flynt-fly
+ "PTH", # https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
]
-# TODO PTH https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
ignore = ["DJ001", "SIM105", "RUF012"]
[lint.per-file-ignores]
".github/scripts/*.py" = ["E501", "INP001", "SIM117"]
"docker/wait-for-redis.py" = ["INP001", "T201"]
+"src/documents/barcodes.py" = ["PTH"] # TODO Enable & remove
+"src/documents/classifier.py" = ["PTH"] # TODO Enable & remove
+"src/documents/consumer.py" = ["PTH"] # TODO Enable & remove
+"src/documents/file_handling.py" = ["PTH"] # TODO Enable & remove
+"src/documents/index.py" = ["PTH"] # TODO Enable & remove
+"src/documents/management/commands/decrypt_documents.py" = ["PTH"] # TODO Enable & remove
+"src/documents/management/commands/document_consumer.py" = ["PTH"] # TODO Enable & remove
+"src/documents/management/commands/document_exporter.py" = ["PTH"] # TODO Enable & remove
+"src/documents/management/commands/document_importer.py" = ["PTH"] # TODO Enable & remove
+"src/documents/migrations/0012_auto_20160305_0040.py" = ["PTH"] # TODO Enable & remove
+"src/documents/migrations/0014_document_checksum.py" = ["PTH"] # TODO Enable & remove
+"src/documents/migrations/1003_mime_types.py" = ["PTH"] # TODO Enable & remove
+"src/documents/migrations/1012_fix_archive_files.py" = ["PTH"] # TODO Enable & remove
+"src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py" = ["PTH"] # TODO Enable & remove
+"src/documents/models.py" = ["SIM115", "PTH"] # TODO PTH Enable & remove
+"src/documents/parsers.py" = ["PTH"] # TODO Enable & remove
+"src/documents/signals/handlers.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tasks.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_api_app_config.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_api_bulk_download.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_api_documents.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_classifier.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_consumer.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_file_handling.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_management.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_management_consumer.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_management_exporter.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_management_thumbnails.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_migration_archive_files.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_migration_document_pages_count.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_migration_mime_type.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_sanity_check.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_tasks.py" = ["PTH"] # TODO Enable & remove
+"src/documents/tests/test_views.py" = ["PTH"] # TODO Enable & remove
+"src/documents/views.py" = ["PTH"] # TODO Enable & remove
+"src/paperless/checks.py" = ["PTH"] # TODO Enable & remove
+"src/paperless/settings.py" = ["PTH"] # TODO Enable & remove
+"src/paperless/tests/test_checks.py" = ["PTH"] # TODO Enable & remove
+"src/paperless/urls.py" = ["PTH"] # TODO Enable & remove
+"src/paperless/views.py" = ["PTH"] # TODO Enable & remove
+"src/paperless_mail/mail.py" = ["PTH"] # TODO Enable & remove
+"src/paperless_mail/preprocessor.py" = ["PTH"] # TODO Enable & remove
+"src/paperless_tesseract/parsers.py" = ["PTH"] # TODO Enable & remove
+"src/paperless_tesseract/tests/test_parser.py" = ["RUF001", "PTH"] # TODO PTH Enable & remove
+"src/paperless_tika/tests/test_live_tika.py" = ["PTH"] # TODO Enable & remove
+"src/paperless_tika/tests/test_tika_parser.py" = ["PTH"] # TODO Enable & remove
"*/tests/*.py" = ["E501", "SIM117"]
"*/migrations/*.py" = ["E501", "SIM", "T201"]
-"src/paperless_tesseract/tests/test_parser.py" = ["RUF001"]
-"src/documents/models.py" = ["SIM115"]
[lint.isort]
force-single-line = true
-import os
+from pathlib import Path
+from typing import TYPE_CHECKING
+from typing import NoReturn
from zipfile import ZipFile
from documents.models import Document
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
class BulkArchiveStrategy:
- def __init__(self, zipf: ZipFile, follow_formatting: bool = False):
- self.zipf = zipf
+ def __init__(self, zipf: ZipFile, follow_formatting: bool = False) -> None:
+ self.zipf: ZipFile = zipf
if follow_formatting:
- self.make_unique_filename = self._formatted_filepath
+ self.make_unique_filename: Callable[..., Path | str] = (
+ self._formatted_filepath
+ )
else:
self.make_unique_filename = self._filename_only
doc: Document,
archive: bool = False,
folder: str = "",
- ):
+ ) -> str:
"""
Constructs a unique name for the given document to be used inside the
zip file.
"""
counter = 0
while True:
- filename = folder + doc.get_public_filename(archive, counter)
+ filename: str = folder + doc.get_public_filename(archive, counter)
if filename in self.zipf.namelist():
counter += 1
else:
doc: Document,
archive: bool = False,
folder: str = "",
- ):
+ ) -> Path:
"""
Constructs a full file path for the given document to be used inside
the zipfile.
The path is already unique, as handled when a document is consumed or updated
"""
if archive and doc.has_archive_version:
- in_archive_path = os.path.join(folder, doc.archive_filename)
+ if TYPE_CHECKING:
+ assert doc.archive_filename is not None
+ in_archive_path: Path = Path(folder) / doc.archive_filename
else:
- in_archive_path = os.path.join(folder, doc.filename)
+ if TYPE_CHECKING:
+ assert doc.filename is not None
+ in_archive_path = Path(folder) / doc.filename
return in_archive_path
- def add_document(self, doc: Document):
+ def add_document(self, doc: Document) -> NoReturn:
raise NotImplementedError # pragma: no cover
class OriginalsOnlyStrategy(BulkArchiveStrategy):
- def add_document(self, doc: Document):
+ def add_document(self, doc: Document) -> None:
self.zipf.write(doc.source_path, self.make_unique_filename(doc))
class ArchiveOnlyStrategy(BulkArchiveStrategy):
- def add_document(self, doc: Document):
+ def add_document(self, doc: Document) -> None:
if doc.has_archive_version:
+ if TYPE_CHECKING:
+ assert doc.archive_path is not None
self.zipf.write(
doc.archive_path,
self.make_unique_filename(doc, archive=True),
class OriginalAndArchiveStrategy(BulkArchiveStrategy):
- def add_document(self, doc: Document):
+ def add_document(self, doc: Document) -> None:
if doc.has_archive_version:
+ if TYPE_CHECKING:
+ assert doc.archive_path is not None
self.zipf.write(
doc.archive_path,
self.make_unique_filename(doc, archive=True, folder="archive/"),
import hashlib
import itertools
import logging
-import os
import tempfile
+from pathlib import Path
+from typing import Literal
from celery import chain
from celery import chord
from documents.tasks import consume_file
from documents.tasks import update_document_archive_file
-logger = logging.getLogger("paperless.bulk_edit")
+logger: logging.Logger = logging.getLogger("paperless.bulk_edit")
-def set_correspondent(doc_ids: list[int], correspondent):
+def set_correspondent(
+ doc_ids: list[int],
+ correspondent: Correspondent,
+) -> Literal["OK"]:
if correspondent:
correspondent = Correspondent.objects.only("pk").get(id=correspondent)
return "OK"
-def set_storage_path(doc_ids: list[int], storage_path):
+def set_storage_path(doc_ids: list[int], storage_path: StoragePath) -> Literal["OK"]:
if storage_path:
storage_path = StoragePath.objects.only("pk").get(id=storage_path)
return "OK"
-def set_document_type(doc_ids: list[int], document_type):
+def set_document_type(doc_ids: list[int], document_type: DocumentType) -> Literal["OK"]:
if document_type:
document_type = DocumentType.objects.only("pk").get(id=document_type)
return "OK"
-def add_tag(doc_ids: list[int], tag: int):
+def add_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(tags__id=tag)).only("pk")
affected_docs = list(qs.values_list("pk", flat=True))
return "OK"
-def remove_tag(doc_ids: list[int], tag: int):
+def remove_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
qs = Document.objects.filter(Q(id__in=doc_ids) & Q(tags__id=tag)).only("pk")
affected_docs = list(qs.values_list("pk", flat=True))
return "OK"
-def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int]):
+def modify_tags(
+ doc_ids: list[int],
+ add_tags: list[int],
+ remove_tags: list[int],
+) -> Literal["OK"]:
qs = Document.objects.filter(id__in=doc_ids).only("pk")
affected_docs = list(qs.values_list("pk", flat=True))
return "OK"
-def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fields):
+def modify_custom_fields(
+ doc_ids: list[int],
+ add_custom_fields,
+ remove_custom_fields,
+) -> Literal["OK"]:
qs = Document.objects.filter(id__in=doc_ids).only("pk")
affected_docs = list(qs.values_list("pk", flat=True))
@shared_task
-def delete(doc_ids: list[int]):
+def delete(doc_ids: list[int]) -> Literal["OK"]:
try:
Document.objects.filter(id__in=doc_ids).delete()
return "OK"
-def reprocess(doc_ids: list[int]):
+def reprocess(doc_ids: list[int]) -> Literal["OK"]:
for document_id in doc_ids:
update_document_archive_file.delay(
document_id=document_id,
return "OK"
-def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False):
+def set_permissions(
+ doc_ids: list[int],
+ set_permissions,
+ owner=None,
+ merge=False,
+) -> Literal["OK"]:
qs = Document.objects.filter(id__in=doc_ids).select_related("owner")
if merge:
return "OK"
-def rotate(doc_ids: list[int], degrees: int):
+def rotate(doc_ids: list[int], degrees: int) -> Literal["OK"]:
logger.info(
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
)
qs = Document.objects.filter(id__in=doc_ids)
- affected_docs = []
+ affected_docs: list[int] = []
import pikepdf
rotate_tasks = []
doc_ids: list[int],
metadata_document_id: int | None = None,
delete_originals: bool = False,
- user: User = None,
-):
+ user: User | None = None,
+) -> Literal["OK"]:
logger.info(
f"Attempting to merge {len(doc_ids)} documents into a single document.",
)
qs = Document.objects.filter(id__in=doc_ids)
- affected_docs = []
+ affected_docs: list[int] = []
import pikepdf
merged_pdf = pikepdf.new()
- version = merged_pdf.pdf_version
+ version: str = merged_pdf.pdf_version
# use doc_ids to preserve order
for doc_id in doc_ids:
doc = qs.get(id=doc_id)
logger.warning("No documents were merged")
return "OK"
- filepath = os.path.join(
- tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
- f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf",
+ filepath = (
+ Path(
+ tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
+ )
+ / f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
)
merged_pdf.remove_unreferenced_resources()
merged_pdf.save(filepath, min_version=version)
if metadata_document_id:
metadata_document = qs.get(id=metadata_document_id)
if metadata_document is not None:
- overrides = DocumentMetadataOverrides.from_document(metadata_document)
+ overrides: DocumentMetadataOverrides = (
+ DocumentMetadataOverrides.from_document(metadata_document)
+ )
overrides.title = metadata_document.title + " (merged)"
+ else:
+ overrides = DocumentMetadataOverrides()
else:
overrides = DocumentMetadataOverrides()
doc_ids: list[int],
pages: list[list[int]],
delete_originals: bool = False,
- user: User = None,
-):
+ user: User | None = None,
+) -> Literal["OK"]:
logger.info(
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
)
try:
with pikepdf.open(doc.source_path) as pdf:
for idx, split_doc in enumerate(pages):
- dst = pikepdf.new()
+ dst: pikepdf.Pdf = pikepdf.new()
for page in split_doc:
dst.pages.append(pdf.pages[page - 1])
- filepath = os.path.join(
- tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
- f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf",
+ filepath: Path = (
+ Path(
+ tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
+ )
+ / f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf"
)
dst.remove_unreferenced_resources()
dst.save(filepath)
dst.close()
- overrides = DocumentMetadataOverrides().from_document(doc)
+ overrides: DocumentMetadataOverrides = (
+ DocumentMetadataOverrides().from_document(doc)
+ )
overrides.title = f"{doc.title} (split {idx + 1})"
if user is not None:
overrides.owner_id = user.id
return "OK"
-def delete_pages(doc_ids: list[int], pages: list[int]):
+def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
logger.info(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
)