-#!/usr/bin/env python3
import json
import logging
import os
will need their own logic
"""
- pass
-
def _main():
parser = ArgumentParser(
-#!/usr/bin/env python3
import logging
-#!/usr/bin/env python3
"""
This is a helper script for the mutli-stage Docker image builder.
It provides a single point of configuration for package version control.
-#!/usr/bin/env python3
"""
This module contains some useful classes for interacting with the Github API.
The full documentation for the API can be found here: https://docs.github.com/en/rest
Returns True if the image has at least one tag which matches the given regex,
False otherwise
"""
- for tag in self.tags:
- if re.match(pattern, tag) is not None:
- return True
- return False
+ return any(re.match(pattern, tag) is not None for tag in self.tags)
def __repr__(self):
return f"Package {self.name}"
.venv/
/docker-compose.env
/docker-compose.yml
+.ruff_cache/
# Used for development
scripts/import-for-development
- markdown
exclude: "(^Pipfile\\.lock$)"
# Python hooks
- - repo: https://github.com/asottile/reorder_python_imports
- rev: v3.9.0
+ - repo: https://github.com/charliermarsh/ruff-pre-commit
+ rev: 'v0.0.259'
hooks:
- - id: reorder-python-imports
- exclude: "(migrations)"
- - repo: https://github.com/asottile/yesqa
- rev: "v1.4.0"
- hooks:
- - id: yesqa
- exclude: "(migrations)"
- - repo: https://github.com/asottile/add-trailing-comma
- rev: "v2.4.0"
- hooks:
- - id: add-trailing-comma
- exclude: "(migrations)"
- - repo: https://github.com/PyCQA/flake8
- rev: 6.0.0
- hooks:
- - id: flake8
- files: ^src/
- args:
- - "--config=./src/setup.cfg"
+ - id: ruff
- repo: https://github.com/psf/black
rev: 22.12.0
hooks:
- id: black
- - repo: https://github.com/asottile/pyupgrade
- rev: v3.3.1
- hooks:
- - id: pyupgrade
- exclude: "(migrations)"
- args:
- - "--py38-plus"
# Dockerfile hooks
- repo: https://github.com/AleksaC/hadolint-py
rev: v2.10.0
--- /dev/null
+# https://beta.ruff.rs/docs/settings/
+# https://beta.ruff.rs/docs/rules/
+select = ["F", "E", "W", "UP", "COM", "DJ", "EXE", "ISC", "ICN", "G201", "INP", "PIE", "RSE", "SIM", "TID", "PLC", "PLE", "RUF"]
+# TODO PTH
+ignore = ["DJ001", "SIM105"]
+fix = true
+line-length = 88
+respect-gitignore = true
+src = ["src"]
+target-version = "py38"
+format = "grouped"
+show-fixes = true
+
+[per-file-ignores]
+".github/scripts/*.py" = ["E501", "INP001", "SIM117"]
+"docker/wait-for-redis.py" = ["INP001"]
+"*/tests/*.py" = ["E501", "SIM117"]
+"*/migrations/*.py" = ["E501", "SIM"]
+"src/paperless_tesseract/tests/test_parser.py" = ["RUF001"]
+"src/documents/models.py" = ["SIM115"]
+
+[isort]
+force-single-line = true
pre-commit = "*"
imagehash = "*"
mkdocs-material = "*"
+ruff = "*"
[typing-dev]
mypy = "*"
"markers": "python_version >= '3.7' and python_version < '4'",
"version": "==2.28.2"
},
+ "ruff": {
+ "hashes": [
+ "sha256:22e1e35bf5f12072cd644d22afd9203641ccf258bc14ff91aa1c43dc14f6047d",
+ "sha256:29e2b77b7d5da6a7dd5cf9b738b511355c5734ece56f78e500d4b5bffd58c1a0",
+ "sha256:38704f151323aa5858370a2f792e122cc25e5d1aabe7d42ceeab83da18f0b456",
+ "sha256:40ae87f2638484b7e8a7567b04a7af719f1c484c5bf132038b702bb32e1f6577",
+ "sha256:428507fb321b386dda70d66cd1a8aa0abf51d7c197983d83bb9e4fa5ee60300b",
+ "sha256:49e903bcda19f6bb0725a962c058eb5d61f40d84ef52ed53b61939b69402ab4e",
+ "sha256:5b3c1beacf6037e7f0781d4699d9a2dd4ba2462f475be5b1f45cf84c4ba3c69d",
+ "sha256:71f0ef1985e9a6696fa97da8459917fa34bdaa2c16bd33bd5edead585b7d44f7",
+ "sha256:79b02fa17ec1fd8d306ae302cb47fb614b71e1f539997858243769bcbe78c6d9",
+ "sha256:7cfef26619cba184d59aa7fa17b48af5891d51fc0b755a9bc533478a10d4d066",
+ "sha256:8b56496063ab3bfdf72339a5fbebb8bd46e5c5fee25ef11a9f03b208fa0562ec",
+ "sha256:aa9449b898287e621942cc71b9327eceb8f0c357e4065fecefb707ef2d978df8",
+ "sha256:c5fbaea9167f1852757f02133e5daacdb8c75b3431343205395da5b10499927a",
+ "sha256:d2fb20e89e85d147c85caa807707a1488bccc1f3854dc3d53533e89b52a0c5ff",
+ "sha256:daaea322e7e85f4c13d82be9536309e1c4b8b9851bb0cbc7eeb15d490fd46bf9",
+ "sha256:e4f39e18702de69faaaee3969934b92d7467285627f99a5b6ecd55a7d9f5d086",
+ "sha256:f3938dc45e2a3f818e9cbd53007265c22246fbfded8837b2c563bf0ebde1a226"
+ ],
+ "index": "pypi",
+ "version": "==0.0.259"
+ },
"scipy": {
"hashes": [
"sha256:02b567e722d62bddd4ac253dafb01ce7ed8742cf8031aea030a41414b86c1125",
REDIS_URL: Final[str] = os.getenv("PAPERLESS_REDIS", "redis://localhost:6379")
- print(f"Waiting for Redis...", flush=True)
+ print("Waiting for Redis...", flush=True)
attempt = 0
with Redis.from_url(url=REDIS_URL) as client:
attempt += 1
if attempt >= MAX_RETRY_COUNT:
- print(f"Failed to connect to redis using environment variable PAPERLESS_REDIS.")
+ print("Failed to connect to redis using environment variable PAPERLESS_REDIS.")
sys.exit(os.EX_UNAVAILABLE)
else:
- print(f"Connected to Redis broker.")
+ print("Connected to Redis broker.")
sys.exit(os.EX_OK)
worker.log.info("worker received INT or QUIT signal")
## get traceback info
- import threading, sys, traceback
+ import sys
+ import threading
+ import traceback
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
filepath,
],
)
- with filepath.open("rb") as img_file:
- with newpath.open("wb") as pdf_file:
- pdf_file.write(img2pdf.convert(img_file))
+ with filepath.open("rb") as img_file, newpath.open("wb") as pdf_file:
+ pdf_file.write(img2pdf.convert(img_file))
return newpath
return in_archive_path
def add_document(self, doc: Document):
- raise NotImplementedError() # pragma: no cover
+ raise NotImplementedError # pragma: no cover
class OriginalsOnlyStrategy(BulkArchiveStrategy):
self.document_type_classifier = pickle.load(f)
self.storage_path_classifier = pickle.load(f)
except Exception as err:
- raise ClassifierModelCorruptError() from err
+ raise ClassifierModelCorruptError from err
# Check for the warning about unpickling from differing versions
# and consider it incompatible
if issubclass(warning.category, UserWarning):
w_msg = str(warning.message)
if sk_learn_warning_url in w_msg:
- raise IncompatibleClassifierVersionError()
+ raise IncompatibleClassifierVersionError
def save(self):
target_file = settings.MODEL_FILE
)
def _write(self, storage_type, source, target):
- with open(source, "rb") as read_file:
- with open(target, "wb") as write_file:
- write_file.write(read_file.read())
+ with open(source, "rb") as read_file, open(target, "wb") as write_file:
+ write_file.write(read_file.read())
def _log_script_outputs(self, completed_process: CompletedProcess):
"""
class DelayedQuery:
def _get_query(self):
- raise NotImplementedError()
+ raise NotImplementedError
def _get_query_filter(self):
criterias = []
new_size = stat_data.st_size
except FileNotFoundError:
logger.debug(
- f"File {file} moved while waiting for it to remain " f"unmodified.",
+ f"File {file} moved while waiting for it to remain unmodified.",
)
return
if new_mtime == mtime and new_size == size:
while not finished:
try:
for event in inotify.read(timeout=timeout):
- if recursive:
- path = inotify.get_path(event.wd)
- else:
- path = directory
+ path = inotify.get_path(event.wd) if recursive else directory
filepath = os.path.join(path, event.name)
notified_files[filepath] = monotonic()
from django.core.management.base import BaseCommand
-from ...tasks import train_classifier
+from documents.tasks import train_classifier
class Command(BaseCommand):
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
-from ...file_handling import delete_empty_directories
-from ...file_handling import generate_filename
+from documents.file_handling import delete_empty_directories
+from documents.file_handling import generate_filename
class Command(BaseCommand):
if self.compare_checksums and source_checksum:
target_checksum = hashlib.md5(target.read_bytes()).hexdigest()
perform_copy = target_checksum != source_checksum
- elif source_stat.st_mtime != target_stat.st_mtime:
- perform_copy = True
- elif source_stat.st_size != target_stat.st_size:
+ elif (
+ source_stat.st_mtime != target_stat.st_mtime
+ or source_stat.st_size != target_stat.st_size
+ ):
perform_copy = True
else:
# Copy if it does not exist
from filelock import FileLock
from paperless import version
-from ...file_handling import create_source_path_directory
-from ...signals.handlers import update_filename_and_move_files
+from documents.file_handling import create_source_path_directory
+from documents.signals.handlers import update_filename_and_move_files
@contextmanager
post_save,
receiver=update_filename_and_move_files,
sender=Document,
+ ), disable_signal(
+ m2m_changed,
+ receiver=update_filename_and_move_files,
+ sender=Document.tags.through,
):
- with disable_signal(
- m2m_changed,
- receiver=update_filename_and_move_files,
- sender=Document.tags.through,
- ):
- # Fill up the database with whatever is in the manifest
- try:
- for manifest_path in manifest_paths:
- call_command("loaddata", manifest_path)
- except (FieldDoesNotExist, DeserializationError) as e:
- self.stdout.write(self.style.ERROR("Database import failed"))
- if (
- self.version is not None
- and self.version != version.__full_version_str__
- ):
- self.stdout.write(
- self.style.ERROR(
- "Version mismatch: "
- f"Currently {version.__full_version_str__},"
- f" importing {self.version}",
- ),
- )
- raise e
- else:
- self.stdout.write(
- self.style.ERROR("No version information present"),
- )
- raise e
+ # Fill up the database with whatever is in the manifest
+ try:
+ for manifest_path in manifest_paths:
+ call_command("loaddata", manifest_path)
+ except (FieldDoesNotExist, DeserializationError) as e:
+ self.stdout.write(self.style.ERROR("Database import failed"))
+ if (
+ self.version is not None
+ and self.version != version.__full_version_str__
+ ):
+ self.stdout.write(
+ self.style.ERROR(
+ "Version mismatch: "
+ f"Currently {version.__full_version_str__},"
+ f" importing {self.version}",
+ ),
+ )
+ raise e
+ else:
+ self.stdout.write(
+ self.style.ERROR("No version information present"),
+ )
+ raise e
- self._import_files_from_manifest(options["no_progress_bar"])
+ self._import_files_from_manifest(options["no_progress_bar"])
self.stdout.write("Updating search index...")
call_command(
def _check_manifest_exists(path):
if not os.path.exists(path):
raise CommandError(
- "That directory doesn't appear to contain a manifest.json " "file.",
+ "That directory doesn't appear to contain a manifest.json file.",
)
def _check_manifest(self):
for record in self.manifest:
- if not record["model"] == "documents.document":
+ if record["model"] != "documents.document":
continue
if EXPORTER_FILE_NAME not in record:
from documents.classifier import load_classifier
from documents.models import Document
-from ...signals.handlers import set_correspondent
-from ...signals.handlers import set_document_type
-from ...signals.handlers import set_storage_path
-from ...signals.handlers import set_tags
+from documents.signals.handlers import set_correspondent
+from documents.signals.handlers import set_document_type
+from documents.signals.handlers import set_storage_path
+from documents.signals.handlers import set_tags
logger = logging.getLogger("paperless.management.retagger")
from django.core.management.base import BaseCommand
from documents.models import Document
-from ...parsers import get_parser_class_for_mime_type
+from documents.parsers import get_parser_class_for_mime_type
def _process_document(doc_in):
def match_correspondents(document, classifier):
- if classifier:
- pred_id = classifier.predict_correspondent(document.content)
- else:
- pred_id = None
+ pred_id = classifier.predict_correspondent(document.content) if classifier else None
correspondents = Correspondent.objects.all()
def match_document_types(document, classifier):
- if classifier:
- pred_id = classifier.predict_document_type(document.content)
- else:
- pred_id = None
+ pred_id = classifier.predict_document_type(document.content) if classifier else None
document_types = DocumentType.objects.all()
def match_tags(document, classifier):
- if classifier:
- predicted_tag_ids = classifier.predict_tags(document.content)
- else:
- predicted_tag_ids = []
+ predicted_tag_ids = classifier.predict_tags(document.content) if classifier else []
tags = Tag.objects.all()
def match_storage_paths(document, classifier):
- if classifier:
- pred_id = classifier.predict_storage_path(document.content)
- else:
- pred_id = None
+ pred_id = classifier.predict_storage_path(document.content) if classifier else None
storage_paths = StoragePath.objects.all()
document_content = document.content
# Check that match is not empty
- if matching_model.match.strip() == "":
+ if not matching_model.match.strip():
return False
if matching_model.is_insensitive:
)
except re.error:
logger.error(
- f"Error while processing regular expression " f"{matching_model.match}",
+ f"Error while processing regular expression {matching_model.match}",
)
return False
if match:
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2015-12-20 19:10
-from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
models.TextField(
db_index=(
"mysql" not in settings.DATABASES["default"]["ENGINE"]
- )
+ ),
),
),
("created", models.DateTimeField(auto_now_add=True)),
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2015-12-26 13:16
-from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
model_name="document",
name="created",
field=models.DateTimeField(
- default=django.utils.timezone.now, editable=False
+ default=django.utils.timezone.now,
+ editable=False,
),
),
]
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-11 12:21
-from __future__ import unicode_literals
from django.db import migrations, models
from django.template.defaultfilters import slugify
DOCUMENT_SENDER_MAP[document.pk],
created,
) = sender_model.objects.get_or_create(
- name=document.sender, defaults={"slug": slugify(document.sender)}
+ name=document.sender,
+ defaults={"slug": slugify(document.sender)},
)
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-14 18:44
-from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-23 03:13
-from __future__ import unicode_literals
from django.db import migrations
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-23 04:30
-from __future__ import unicode_literals
from django.db import migrations, models
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-26 21:14
-from __future__ import unicode_literals
from django.db import migrations, models
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-01-29 22:58
-from __future__ import unicode_literals
from django.db import migrations, models
model_name="document",
name="tags",
field=models.ManyToManyField(
- blank=True, related_name="documents", to="documents.Tag"
+ blank=True,
+ related_name="documents",
+ to="documents.Tag",
),
),
]
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-02-14 00:40
-from __future__ import unicode_literals
from django.db import migrations, models
-# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-02-27 17:54
-from __future__ import unicode_literals
from django.db import migrations, models
(
"component",
models.PositiveIntegerField(
- choices=[(1, "Consumer"), (2, "Mail Fetcher")]
+ choices=[(1, "Consumer"), (2, "Mail Fetcher")],
),
),
("created", models.DateTimeField(auto_now_add=True)),
-# -*- coding: utf-8 -*-
# Generated by Django 1.9.2 on 2016-03-03 19:29
-from __future__ import unicode_literals
from django.db import migrations
-# -*- coding: utf-8 -*-
# Generated by Django 1.9.2 on 2016-03-05 00:40
-from __future__ import unicode_literals
import gnupg
import os
from django.utils.termcolors import colorize as colourise # Spelling hurts me
-class GnuPG(object):
+class GnuPG:
"""
A handy singleton to use when handling encrypted files.
"""
@classmethod
def encrypted(cls, file_handle):
return cls.gpg.encrypt_file(
- file_handle, recipients=None, passphrase=settings.PASSPHRASE, symmetric=True
+ file_handle,
+ recipients=None,
+ passphrase=settings.PASSPHRASE,
+ symmetric=True,
).data
def move_documents_and_create_thumbnails(apps, schema_editor):
os.makedirs(
- os.path.join(settings.MEDIA_ROOT, "documents", "originals"), exist_ok=True
+ os.path.join(settings.MEDIA_ROOT, "documents", "originals"),
+ exist_ok=True,
)
os.makedirs(
- os.path.join(settings.MEDIA_ROOT, "documents", "thumbnails"), exist_ok=True
+ os.path.join(settings.MEDIA_ROOT, "documents", "thumbnails"),
+ exist_ok=True,
)
documents = os.listdir(os.path.join(settings.MEDIA_ROOT, "documents"))
" in order."
"\n",
opts=("bold",),
- )
+ ),
)
try:
colourise("*", fg="green"),
colourise("Generating a thumbnail for", fg="white"),
colourise(f, fg="cyan"),
- )
+ ),
)
thumb_temp = tempfile.mkdtemp(prefix="paperless", dir=settings.SCRATCH_DIR)
"remove",
orig_target,
os.path.join(thumb_temp, "convert-%04d.png"),
- )
+ ),
).wait()
thumb_source = os.path.join(thumb_temp, "convert-0000.png")
-# -*- coding: utf-8 -*-
# Generated by Django 1.9.4 on 2016-03-25 21:11
-from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
-# -*- coding: utf-8 -*-
# Generated by Django 1.9.4 on 2016-03-28 19:09
-from __future__ import unicode_literals
import gnupg
import hashlib
from django.utils.termcolors import colorize as colourise # Spelling hurts me
-class GnuPG(object):
+class GnuPG:
"""
A handy singleton to use when handling encrypted files.
"""
@classmethod
def encrypted(cls, file_handle):
return cls.gpg.encrypt_file(
- file_handle, recipients=None, passphrase=settings.PASSPHRASE, symmetric=True
+ file_handle,
+ recipients=None,
+ passphrase=settings.PASSPHRASE,
+ symmetric=True,
).data
-class Document(object):
+class Document:
"""
Django's migrations restrict access to model methods, so this is a snapshot
of the methods that existed at the time this migration was written, since
def __str__(self):
created = self.created.strftime("%Y%m%d%H%M%S")
if self.correspondent and self.title:
- return "{}: {} - {}".format(created, self.correspondent, self.title)
+ return f"{created}: {self.correspondent} - {self.title}"
if self.correspondent or self.title:
- return "{}: {}".format(created, self.correspondent or self.title)
+ return f"{created}: {self.correspondent or self.title}"
return str(created)
@property
settings.MEDIA_ROOT,
"documents",
"originals",
- "{:07}.{}.gpg".format(self.pk, self.file_type),
+ f"{self.pk:07}.{self.file_type}.gpg",
)
@property
" order."
"\n",
opts=("bold",),
- )
+ ),
)
sums = {}
colourise("*", fg="green"),
colourise("Generating a checksum for", fg="white"),
colourise(document.file_name, fg="cyan"),
- )
+ ),
)
with document.source_file as encrypted:
fg="yellow",
),
doc1=colourise(
- " * {} (id: {})".format(sums[checksum][1], sums[checksum][0]),
+ f" * {sums[checksum][1]} (id: {sums[checksum][0]})",
fg="red",
),
doc2=colourise(
- " * {} (id: {})".format(document.file_name, document.pk), fg="red"
+ f" * {document.file_name} (id: {document.pk})",
+ fg="red",
),
code=colourise(
" $ echo 'DELETE FROM documents_document WHERE id = {pk};' | ./manage.py dbshell".format(
- pk=document.pk
+ pk=document.pk,
),
fg="green",
),
model_name="document",
name="created",
field=models.DateTimeField(
- db_index=True, default=django.utils.timezone.now
+ db_index=True,
+ default=django.utils.timezone.now,
),
),
migrations.AlterField(
-# -*- coding: utf-8 -*-
# Generated by Django 1.10.2 on 2016-10-05 21:38
-from __future__ import unicode_literals
from django.db import migrations, models
-# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-03-25 15:58
-from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
-# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-05-12 05:07
-from __future__ import unicode_literals
from django.db import migrations, models
-# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-07-15 17:12
-from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
-# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-07-15 17:12
-from __future__ import unicode_literals
from django.contrib.auth.models import User
from django.db import migrations
-# -*- coding: utf-8 -*-
-from __future__ import unicode_literals
-
from django.db import migrations, models
import django.utils.timezone
model_name="document",
name="added",
field=models.DateTimeField(
- db_index=True, default=django.utils.timezone.now, editable=False
+ db_index=True,
+ default=django.utils.timezone.now,
+ editable=False,
),
),
migrations.RunPython(set_added_time_to_created_time),
-# -*- coding: utf-8 -*-
# Generated by Django 1.11.10 on 2018-02-04 13:07
-from __future__ import unicode_literals
from django.db import migrations, models
def set_filename(apps, schema_editor):
Document = apps.get_model("documents", "Document")
for doc in Document.objects.all():
- file_name = "{:07}.{}".format(doc.pk, doc.file_type)
+ file_name = f"{doc.pk:07}.{doc.file_type}"
if doc.storage_type == "gpg":
file_name += ".gpg"
]
operations = [
- migrations.RunPython(migrations.RunPython.noop, migrations.RunPython.noop)
+ migrations.RunPython(migrations.RunPython.noop, migrations.RunPython.noop),
]
# Generated by Django 3.1.3 on 2020-11-20 11:21
-import mimetypes
import os
import magic
if self.filename:
fname = str(self.filename)
else:
- fname = "{:07}.{}".format(self.pk, self.file_type)
+ fname = f"{self.pk:07}.{self.file_type}"
if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg"
(15, "Modified before"),
(16, "Modified after"),
(17, "Does not have tag"),
- ]
+ ],
),
),
("value", models.CharField(max_length=128)),
model_name="document",
name="created",
field=models.DateTimeField(
- db_index=True, default=django.utils.timezone.now, verbose_name="created"
+ db_index=True,
+ default=django.utils.timezone.now,
+ verbose_name="created",
),
),
migrations.AlterField(
model_name="document",
name="mime_type",
field=models.CharField(
- editable=False, max_length=256, verbose_name="mime type"
+ editable=False,
+ max_length=256,
+ verbose_name="mime type",
),
),
migrations.AlterField(
model_name="document",
name="modified",
field=models.DateTimeField(
- auto_now=True, db_index=True, verbose_name="modified"
+ auto_now=True,
+ db_index=True,
+ verbose_name="modified",
),
),
migrations.AlterField(
model_name="document",
name="title",
field=models.CharField(
- blank=True, db_index=True, max_length=128, verbose_name="title"
+ blank=True,
+ db_index=True,
+ max_length=128,
+ verbose_name="title",
),
),
migrations.AlterField(
model_name="savedviewfilterrule",
name="value",
field=models.CharField(
- blank=True, max_length=128, null=True, verbose_name="value"
+ blank=True,
+ max_length=128,
+ null=True,
+ verbose_name="value",
),
),
migrations.AlterField(
if doc.filename:
fname = archive_name_from_filename(doc.filename)
else:
- fname = "{:07}.pdf".format(doc.pk)
+ fname = f"{doc.pk:07}.pdf"
return os.path.join(settings.ARCHIVE_DIR, fname)
if doc.filename:
fname = str(doc.filename)
else:
- fname = "{:07}{}".format(doc.pk, doc.file_type)
+ fname = f"{doc.pk:07}{doc.file_type}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
while True:
new_filename = generate_filename(
- doc, counter, archive_filename=archive_filename
+ doc,
+ counter,
+ archive_filename=archive_filename,
)
if new_filename == old_filename:
# still the same as before.
if doc.correspondent:
correspondent = pathvalidate.sanitize_filename(
- doc.correspondent.name, replacement_text="-"
+ doc.correspondent.name,
+ replacement_text="-",
)
else:
correspondent = "none"
if doc.document_type:
document_type = pathvalidate.sanitize_filename(
- doc.document_type.name, replacement_text="-"
+ doc.document_type.name,
+ replacement_text="-",
)
else:
document_type = "none"
document_type=document_type,
created=datetime.date.isoformat(doc.created),
created_year=doc.created.year if doc.created else "none",
- created_month=f"{doc.created.month:02}"
- if doc.created
- else "none", # NOQA: E501
+ created_month=f"{doc.created.month:02}" if doc.created else "none",
created_day=f"{doc.created.day:02}" if doc.created else "none",
added=datetime.date.isoformat(doc.added),
added_year=doc.added.year if doc.added else "none",
except (ValueError, KeyError, IndexError):
logger.warning(
f"Invalid PAPERLESS_FILENAME_FORMAT: "
- f"{settings.FILENAME_FORMAT}, falling back to default"
+ f"{settings.FILENAME_FORMAT}, falling back to default",
)
counter_str = f"_{counter:02}" if counter else ""
parser: DocumentParser = parser_class(None, None)
try:
parse_wrapper(
- parser, source_path(doc), doc.mime_type, os.path.basename(doc.filename)
+ parser,
+ source_path(doc),
+ doc.mime_type,
+ os.path.basename(doc.filename),
)
doc.content = parser.get_text()
if parser.get_archive_path() and os.path.isfile(parser.get_archive_path()):
doc.archive_filename = generate_unique_filename(
- doc, archive_filename=True
+ doc,
+ archive_filename=True,
)
with open(parser.get_archive_path(), "rb") as f:
doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
doc.archive_checksum = None
logger.error(
f"Parser did not return an archive document for document "
- f"ID:{doc.id}. Removing archive document."
+ f"ID:{doc.id}. Removing archive document.",
)
doc.save()
return
logger.exception(
f"Unable to regenerate archive document for ID:{doc.id}. You "
f"need to invoke the document_archiver management command "
- f"manually for that document."
+ f"manually for that document.",
)
doc.archive_checksum = None
doc.save()
old_path = archive_path_old(doc)
if doc.id not in affected_document_ids and not os.path.isfile(old_path):
raise ValueError(
- f"Archived document ID:{doc.id} does not exist at: " f"{old_path}"
+ f"Archived document ID:{doc.id} does not exist at: {old_path}",
)
# check that we can regenerate affected archive versions
if not parser_class:
raise ValueError(
f"Document ID:{doc.id} has an invalid archived document, "
- f"but no parsers are available. Cannot migrate."
+ f"but no parsers are available. Cannot migrate.",
)
for doc in Document.objects.filter(archive_checksum__isnull=False):
# Set archive path for unaffected files
doc.archive_filename = archive_name_from_filename(doc.filename)
Document.objects.filter(id=doc.id).update(
- archive_filename=doc.archive_filename
+ archive_filename=doc.archive_filename,
)
# regenerate archive documents
raise ValueError(
f"Cannot migrate: Archive file name {old_archive_path} of "
f"document {doc.filename} would clash with another archive "
- f"filename."
+ f"filename.",
)
old_archive_paths.add(old_archive_path)
if new_archive_path != old_archive_path and os.path.isfile(old_archive_path):
raise ValueError(
f"Cannot migrate: Cannot move {new_archive_path} to "
- f"{old_archive_path}: file already exists."
+ f"{old_archive_path}: file already exists.",
)
for doc in Document.objects.filter(archive_checksum__isnull=False):
model_name="tag",
name="color",
field=models.CharField(
- default="#a6cee3", max_length=7, verbose_name="color"
+ default="#a6cee3",
+ max_length=7,
+ verbose_name="color",
),
),
migrations.RunPython(forward, reverse),
]
operations = [
- migrations.RunPython(remove_null_characters, migrations.RunPython.noop)
+ migrations.RunPython(remove_null_characters, migrations.RunPython.noop),
]
model_name="savedview",
name="sort_field",
field=models.CharField(
- blank=True, max_length=128, null=True, verbose_name="sort field"
+ blank=True,
+ max_length=128,
+ null=True,
+ verbose_name="sort field",
),
),
migrations.AlterField(
model_name="savedviewfilterrule",
name="value",
field=models.CharField(
- blank=True, max_length=255, null=True, verbose_name="value"
+ blank=True,
+ max_length=255,
+ null=True,
+ verbose_name="value",
),
),
]
# Drop the django-q tables entirely
# Must be done last or there could be references here
migrations.RunSQL(
- "DROP TABLE IF EXISTS django_q_ormq", reverse_sql=migrations.RunSQL.noop
+ "DROP TABLE IF EXISTS django_q_ormq",
+ reverse_sql=migrations.RunSQL.noop,
),
migrations.RunSQL(
- "DROP TABLE IF EXISTS django_q_schedule", reverse_sql=migrations.RunSQL.noop
+ "DROP TABLE IF EXISTS django_q_schedule",
+ reverse_sql=migrations.RunSQL.noop,
),
migrations.RunSQL(
- "DROP TABLE IF EXISTS django_q_task", reverse_sql=migrations.RunSQL.noop
+ "DROP TABLE IF EXISTS django_q_task",
+ reverse_sql=migrations.RunSQL.noop,
),
]
migrations.AddConstraint(
model_name="storagepath",
constraint=models.UniqueConstraint(
- fields=("name", "owner"), name="documents_storagepath_unique_name_owner"
+ fields=("name", "owner"),
+ name="documents_storagepath_unique_name_owner",
),
),
migrations.AddConstraint(
migrations.AddConstraint(
model_name="tag",
constraint=models.UniqueConstraint(
- fields=("name", "owner"), name="documents_tag_unique_name_owner"
+ fields=("name", "owner"),
+ name="documents_tag_unique_name_owner",
),
),
migrations.AddConstraint(
model_name="note",
name="note",
field=models.TextField(
- blank=True, help_text="Note for the document", verbose_name="content"
+ blank=True,
+ help_text="Note for the document",
+ verbose_name="content",
),
),
migrations.AlterField(
MinValueValidator(ARCHIVE_SERIAL_NUMBER_MIN),
],
help_text=_(
- "The position of this document in your physical document " "archive.",
+ "The position of this document in your physical document archive.",
),
)
verbose_name = _("filter rule")
verbose_name_plural = _("filter rules")
+ def __str__(self) -> str:
+ return f"SavedViewFilterRule: {self.rule_type} : {self.value}"
+
# TODO: why is this in the models file?
# TODO: how about, what is this and where is it documented?
(
"created-title",
re.compile(
- r"^(?P<created>\d{8}(\d{6})?Z) - " r"(?P<title>.*)$",
+ r"^(?P<created>\d{8}(\d{6})?Z) - (?P<title>.*)$",
flags=re.IGNORECASE,
),
),
),
)
+ def __str__(self) -> str:
+ return f"Task {self.task_id}"
+
class Note(models.Model):
note = models.TextField(
return []
def parse(self, document_path, mime_type, file_name=None):
- raise NotImplementedError()
+ raise NotImplementedError
def get_archive_path(self):
return self.archive_path
"""
Returns the path to a file we can use as a thumbnail for this document.
"""
- raise NotImplementedError()
+ raise NotImplementedError
def get_text(self):
return self.text
except OSError as e:
messages.error(doc.pk, f"Cannot read original file of document: {e}")
else:
- if not checksum == doc.checksum:
+ if checksum != doc.checksum:
messages.error(
doc.pk,
"Checksum mismatch. "
f"Cannot read archive file of document : {e}",
)
else:
- if not checksum == doc.archive_checksum:
+ if checksum != doc.archive_checksum:
messages.error(
doc.pk,
"Checksum mismatch of archived document. "
try:
import zoneinfo
except ImportError:
- import backports.zoneinfo as zoneinfo
+ from backports import zoneinfo
import magic
from django.conf import settings
from django.utils.text import slugify
class OwnedObjectSerializer(serializers.ModelSerializer, SetPermissionsMixin):
def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user", None)
- return super().__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def get_permissions(self, obj):
view_codename = f"view_{obj.__class__.__name__.lower()}"
for id, color in self.COLOURS:
if id == data:
return color
- raise serializers.ValidationError()
+ raise serializers.ValidationError
def to_representation(self, value):
for id, color in self.COLOURS:
def _validate_document_id_list(self, documents, name="documents"):
if not type(documents) == list:
raise serializers.ValidationError(f"{name} must be a list")
- if not all([type(i) == int for i in documents]):
+ if not all(type(i) == int for i in documents):
raise serializers.ValidationError(f"{name} must be a list of integers")
count = Document.objects.filter(id__in=documents).count()
if not count == len(documents):
raise serializers.ValidationError(
- f"Some documents in {name} don't exist or were " f"specified twice.",
+ f"Some documents in {name} don't exist or were specified twice.",
)
def validate_documents(self, documents):
def _validate_tag_id_list(self, tags, name="tags"):
if not type(tags) == list:
raise serializers.ValidationError(f"{name} must be a list")
- if not all([type(i) == int for i in tags]):
+ if not all(type(i) == int for i in tags):
raise serializers.ValidationError(f"{name} must be a list of integers")
count = Tag.objects.filter(id__in=tags).count()
if not count == len(tags):
original_name="testfile",
)
- except (KeyError):
- raise serializers.ValidationError(_("Invalid variable detected."))
+ except KeyError as err:
+ raise serializers.ValidationError(_("Invalid variable detected.")) from err
return path
pass
if not type(tasks) == list:
raise serializers.ValidationError(f"{name} must be a list")
- if not all([type(i) == int for i in tasks]):
+ if not all(type(i) == int for i in tasks):
raise serializers.ValidationError(f"{name} must be a list of integers")
count = PaperlessTask.objects.filter(id__in=tasks).count()
if not count == len(tasks):
from django.utils import timezone
from filelock import FileLock
-from .. import matching
-from ..file_handling import create_source_path_directory
-from ..file_handling import delete_empty_directories
-from ..file_handling import generate_unique_filename
-from ..models import Document
-from ..models import MatchingModel
-from ..models import PaperlessTask
-from ..models import Tag
+from documents import matching
+from documents.file_handling import create_source_path_directory
+from documents.file_handling import delete_empty_directories
+from documents.file_handling import generate_unique_filename
+from documents.models import Document
+from documents.models import MatchingModel
+from documents.models import PaperlessTask
+from documents.models import Tag
logger = logging.getLogger("paperless.handlers")
potential_correspondents = matching.match_correspondents(document, classifier)
potential_count = len(potential_correspondents)
- if potential_correspondents:
- selected = potential_correspondents[0]
- else:
- selected = None
+ selected = potential_correspondents[0] if potential_correspondents else None
if potential_count > 1:
if use_first:
logger.debug(
potential_document_type = matching.match_document_types(document, classifier)
potential_count = len(potential_document_type)
- if potential_document_type:
- selected = potential_document_type[0]
- else:
- selected = None
+ selected = potential_document_type[0] if potential_document_type else None
if potential_count > 1:
if use_first:
)
potential_count = len(potential_storage_path)
- if potential_storage_path:
- selected = potential_storage_path[0]
- else:
- selected = None
+ selected = potential_storage_path[0] if potential_storage_path else None
if potential_count > 1:
if use_first:
if not os.path.isfile(old_path):
# Can't do anything if the old file does not exist anymore.
logger.fatal(f"Document {str(instance)}: File {old_path} has gone.")
- raise CannotMoveFilesException()
+ raise CannotMoveFilesException
if os.path.isfile(new_path):
# Can't do anything if the new file already exists. Skip updating file.
f"Document {str(instance)}: Cannot rename file "
f"since target path {new_path} already exists.",
)
- raise CannotMoveFilesException()
+ raise CannotMoveFilesException
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
date_started=None,
date_done=None,
)
- except Exception as e: # pragma: no cover
+ except Exception: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Creating PaperlessTask failed: {e}", exc_info=True)
+ logger.exception("Creating PaperlessTask failed")
@task_prerun.connect
task_instance.status = states.STARTED
task_instance.date_started = timezone.now()
task_instance.save()
- except Exception as e: # pragma: no cover
+ except Exception: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Setting PaperlessTask started failed: {e}", exc_info=True)
+ logger.exception("Setting PaperlessTask started failed")
@task_postrun.connect
def task_postrun_handler(
- sender=None, task_id=None, task=None, retval=None, state=None, **kwargs
+ sender=None,
+ task_id=None,
+ task=None,
+ retval=None,
+ state=None,
+ **kwargs,
):
"""
Updates the result of the PaperlessTask.
task_instance.result = retval
task_instance.date_done = timezone.now()
task_instance.save()
- except Exception as e: # pragma: no cover
+ except Exception: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Updating PaperlessTask failed: {e}", exc_info=True)
+ logger.exception("Updating PaperlessTask failed")
except Exception:
logger.exception(
- f"Error while parsing document {document} " f"(ID: {document_id})",
+ f"Error while parsing document {document} (ID: {document_id})",
)
finally:
parser.cleanup()
from factory import Faker
from factory.django import DjangoModelFactory
-from ..models import Correspondent
-from ..models import Document
+from documents.models import Correspondent
+from documents.models import Document
class CorrespondentFactory(DjangoModelFactory):
try:
import zoneinfo
except ImportError:
- import backports.zoneinfo as zoneinfo
+ from backports import zoneinfo
import pytest
from django.conf import settings
def test_document_fields(self):
c = Correspondent.objects.create(name="c", pk=41)
dt = DocumentType.objects.create(name="dt", pk=63)
- tag = Tag.objects.create(name="t", pk=85)
+ Tag.objects.create(name="t", pk=85)
storage_path = StoragePath.objects.create(name="sp", pk=77, path="p")
- doc = Document.objects.create(
+ Document.objects.create(
title="WOW",
content="the content",
correspondent=c,
added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
content="test",
)
- d6 = Document.objects.create(checksum="6", content="test2")
+ Document.objects.create(checksum="6", content="test2")
d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
with AsyncWriter(index.open_index()) as writer:
mime_type="application/pdf",
content="abc",
)
- doc2 = Document.objects.create(
+ Document.objects.create(
title="none2",
checksum="B",
mime_type="application/pdf",
content="123",
)
- doc3 = Document.objects.create(
+ Document.objects.create(
title="none3",
checksum="C",
mime_type="text/plain",
show_on_dashboard=False,
show_in_sidebar=False,
)
- v2 = SavedView.objects.create(
+ SavedView.objects.create(
owner=u2,
name="test2",
sort_field="",
show_on_dashboard=False,
show_in_sidebar=False,
)
- v3 = SavedView.objects.create(
+ SavedView.objects.create(
owner=u2,
name="test3",
sort_field="",
def test_create_update_patch(self):
- u1 = User.objects.create_user("user1")
+ User.objects.create_user("user1")
view = {
"name": "test",
self.assertEqual(f.read(), zipf.read("2021-01-01 document A_01.pdf"))
def test_compression(self):
- response = self.client.post(
+ self.client.post(
self.ENDPOINT,
json.dumps(
{"documents": [self.doc2.id, self.doc2b.id], "compression": "lzma"},
user = User.objects.create_user(username="test")
self.client.force_authenticate(user)
- d = Document.objects.create(title="Test")
+ Document.objects.create(title="Test")
self.assertEqual(
self.client.get("/api/documents/").status_code,
user.user_permissions.add(*Permission.objects.all())
self.client.force_authenticate(user)
- d = Document.objects.create(title="Test")
+ Document.objects.create(title="Test")
self.assertEqual(
self.client.get("/api/documents/").status_code,
THEN:
- No task data is returned
"""
- task1 = PaperlessTask.objects.create(
+ PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
)
THEN:
- The returned data includes the task result
"""
- task = PaperlessTask.objects.create(
+ PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
status=celery.states.SUCCESS,
THEN:
- The returned result is the exception info
"""
- task = PaperlessTask.objects.create(
+ PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
status=celery.states.FAILURE,
THEN:
- Returned data include the filename
"""
- task = PaperlessTask.objects.create(
+ PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="test.pdf",
task_name="documents.tasks.some_task",
THEN:
- Returned data include the filename
"""
- task = PaperlessTask.objects.create(
+ PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="anothertest.pdf",
task_name="documents.tasks.some_task",
-import os
import shutil
from pathlib import Path
from unittest import mock
+import platform
import pytest
from django.conf import settings
from documents import tasks
from documents.consumer import ConsumerError
from documents.data_models import ConsumableDocument
-from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from PIL import Image
-try:
- import zxingcpp
-
- ZXING_AVAILIBLE = True
-except ImportError:
- ZXING_AVAILIBLE = False
-
@override_settings(CONSUMER_BARCODE_SCANNER="PYZBAR")
class TestBarcode(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
self.assertDictEqual(separator_page_numbers, {})
@override_settings(CONSUMER_BARCODE_STRING="ADAR-NEXTDOC")
- def test_scan_file_for_separating_qr_barcodes(self):
+ def test_scan_file_qr_barcodes_was_problem(self):
"""
GIVEN:
- Input PDF with certain QR codes that aren't detected at current size
@pytest.mark.skipif(
- not ZXING_AVAILIBLE,
+ platform.machine().upper() not in {"AMD64"},
reason="No zxingcpp",
)
@override_settings(CONSUMER_BARCODE_SCANNER="ZXING")
@pytest.mark.skipif(
- not ZXING_AVAILIBLE,
+ platform.machine().upper() not in {"AMD64"},
reason="No zxingcpp",
)
@override_settings(CONSUMER_BARCODE_SCANNER="ZXING")
# rebuilding the file and committing that. Not developer friendly
# Need to rethink how to pass the load through to a file with a single
# old model?
- pass
def test_one_correspondent_predict(self):
c1 = Correspondent.objects.create(
self.assertListEqual(self.classifier.predict_tags(doc1.content), [t1.pk])
def test_one_tag_predict_unassigned(self):
- t1 = Tag.objects.create(name="t1", matching_algorithm=Tag.MATCH_AUTO, pk=12)
+ Tag.objects.create(name="t1", matching_algorithm=Tag.MATCH_AUTO, pk=12)
doc1 = Document.objects.create(
title="doc1",
self.assertIsNotNone(classifier)
with mock.patch("documents.classifier.DocumentClassifier.load") as load:
- classifier2 = load_classifier()
+ load_classifier()
load.assert_not_called()
@mock.patch("documents.classifier.DocumentClassifier.load")
try:
import zoneinfo
except ImportError:
- import backports.zoneinfo as zoneinfo
+ from backports import zoneinfo
from django.conf import settings
from django.utils import timezone
from django.test import override_settings
from django.test import TestCase
-from ..consumer import Consumer
-from ..consumer import ConsumerError
-from ..models import Correspondent
-from ..models import Document
-from ..models import DocumentType
-from ..models import FileInfo
-from ..models import Tag
-from ..parsers import DocumentParser
-from ..parsers import ParseError
-from ..tasks import sanity_check
+from documents.consumer import Consumer
+from documents.consumer import ConsumerError
+from documents.models import Correspondent
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import FileInfo
+from documents.models import Tag
+from documents.parsers import DocumentParser
+from documents.parsers import ParseError
+from documents.tasks import sanity_check
from .utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
"20150102030405Z",
"20150102Z",
)
- valid_correspondents = ["timmy", "Dr. McWheelie", "Dash Gor-don", "ο Θερμαστής", ""]
- valid_titles = ["title", "Title w Spaces", "Title a-dash", "Τίτλος", ""]
+ valid_correspondents = ["timmy", "Dr. McWheelie", "Dash Gor-don", "o Θεpμaoτής", ""]
+ valid_titles = ["title", "Title w Spaces", "Title a-dash", "Tίτλoς", ""]
valid_tags = ["tag", "tig,tag", "tag1,tag2,tag-3"]
def _test_guessed_attributes(
filename = "tag1,tag2_20190908_180610_0001.pdf"
all_patt = re.compile("^.*$")
none_patt = re.compile("$a")
- exact_patt = re.compile("^([a-z0-9,]+)_(\\d{8})_(\\d{6})_([0-9]+)\\.")
- repl1 = " - \\4 - \\1." # (empty) corrspondent, title and tags
- repl2 = "\\2Z - " + repl1 # creation date + repl1
+ re.compile("^([a-z0-9,]+)_(\\d{8})_(\\d{6})_([0-9]+)\\.")
# No transformations configured (= default)
info = FileInfo.from_filename(filename)
class DummyParser(DocumentParser):
- def get_thumbnail(self, document_path, mime_type, file_name=None):
- # not important during tests
- raise NotImplementedError()
-
def __init__(self, logging_group, scratch_dir, archive_path):
super().__init__(logging_group, None)
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=scratch_dir)
def get_thumbnail(self, document_path, mime_type, file_name=None):
return self.fake_thumb
- def get_thumbnail(self, document_path, mime_type, file_name=None):
- return self.fake_thumb
-
def __init__(self, logging_group, progress_callback=None):
super().__init__(logging_group, progress_callback)
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=self.tempdir)
class FaultyParser(DocumentParser):
- def get_thumbnail(self, document_path, mime_type, file_name=None):
- # not important during tests
- raise NotImplementedError()
-
def __init__(self, logging_group, scratch_dir):
super().__init__(logging_group)
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=scratch_dir)
)
def test_date_format_5(self):
- text = "lorem ipsum 130218, 2018, 20180213 and lorem 13.02.2018 lorem " "ipsum"
+ text = "lorem ipsum 130218, 2018, 20180213 and lorem 13.02.2018 lorem ipsum"
date = parse_date("", text)
self.assertEqual(
date,
self.assertEqual(parse_date("", text), None)
def test_date_format_7(self):
- text = "lorem ipsum\n" "März 2019\n" "lorem ipsum"
+ text = "lorem ipsum\nMärz 2019\nlorem ipsum"
date = parse_date("", text)
self.assertEqual(
date,
@override_settings(SCRATCH_DIR=SCRATCH)
def test_date_format_9(self):
- text = "lorem ipsum\n" "27. Nullmonth 2020\n" "März 2020\n" "lorem ipsum"
+ text = "lorem ipsum\n27. Nullmonth 2020\nMärz 2020\nlorem ipsum"
self.assertEqual(
parse_date("", text),
datetime.datetime(2020, 3, 1, 0, 0, tzinfo=tz.gettz(settings.TIME_ZONE)),
THEN:
- Should parse the date non-ignored date from content
"""
- text = "lorem ipsum 110319, 20200117 and lorem 13.02.2018 lorem " "ipsum"
+ text = "lorem ipsum 110319, 20200117 and lorem 13.02.2018 lorem ipsum"
self.assertEqual(
parse_date("", text),
datetime.datetime(2018, 2, 13, 0, 0, tzinfo=tz.gettz(settings.TIME_ZONE)),
THEN:
- Should parse the date non-ignored date from content
"""
- text = "lorem ipsum 190311, 20200117 and lorem 13.02.2018 lorem " "ipsum"
+ text = "lorem ipsum 190311, 20200117 and lorem 13.02.2018 lorem ipsum"
self.assertEqual(
parse_date("", text),
try:
import zoneinfo
except ImportError:
- import backports.zoneinfo as zoneinfo
+ from backports import zoneinfo
from django.test import override_settings
from django.test import TestCase
from django.utils import timezone
-from ..models import Correspondent
-from ..models import Document
+from documents.models import Correspondent
+from documents.models import Document
class TestDocument(TestCase):
from django.test import override_settings
from django.test import TestCase
from django.utils import timezone
-from documents.tests.utils import FileSystemAssertsMixin
-from ..file_handling import create_source_path_directory
-from ..file_handling import delete_empty_directories
-from ..file_handling import generate_filename
-from ..models import Correspondent
-from ..models import Document
-from ..models import DocumentType
-from ..models import StoragePath
-from .utils import DirectoriesMixin
-from .utils import FileSystemAssertsMixin
+from documents.file_handling import create_source_path_directory
+from documents.file_handling import delete_empty_directories
+from documents.file_handling import generate_filename
+from documents.models import Correspondent
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import StoragePath
+from documents.tests.utils import DirectoriesMixin
+from documents.tests.utils import FileSystemAssertsMixin
class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming_database_error(self):
- document1 = Document.objects.create(
+ Document.objects.create(
mime_type="application/pdf",
storage_type=Document.STORAGE_TYPE_UNENCRYPTED,
checksum="AAAAA",
Path(document.source_path).touch()
# Ensure file deletion after delete
- pk = document.pk
document.delete()
self.assertIsNotFile(
os.path.join(settings.ORIGINALS_DIR, "none", "none.pdf"),
# Check proper handling of files
self.assertIsDir(os.path.join(settings.ORIGINALS_DIR, "none/none"))
- pk = document.pk
document.delete()
self.assertIsNotFile(
def test_move_archive_error(self, m):
def fake_rename(src, dst):
if "archive" in str(src):
- raise OSError()
+ raise OSError
else:
os.remove(src)
Path(dst).touch()
def test_move_file_error(self, m):
def fake_rename(src, dst):
if "original" in str(src):
- raise OSError()
+ raise OSError
else:
os.remove(src)
Path(dst).touch()
from django.test import TestCase
from documents.settings import EXPORTER_FILE_NAME
-from ..management.commands.document_importer import Command
+from documents.management.commands.document_importer import Command
class TestImporter(TestCase):
from django.test import TransactionTestCase
from documents.consumer import ConsumerError
from documents.data_models import ConsumableDocument
-from documents.data_models import DocumentMetadataOverrides
from documents.management.commands import document_consumer
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
self.assertEqual(element["fields"]["document"], self.d1.id)
self.assertEqual(element["fields"]["user"], self.user.id)
- with paperless_environment() as dirs:
+ with paperless_environment():
self.assertEqual(Document.objects.count(), 4)
Document.objects.all().delete()
Correspondent.objects.all().delete()
os.path.join(self.dirs.media_dir, "documents"),
)
- m = self._do_export(use_filename_format=True)
+ self._do_export(use_filename_format=True)
self.assertIsFile(os.path.join(self.target, "wow1", "c.pdf"))
self.assertIsFile(os.path.join(self.target, "manifest.json"))
)
self.assertFalse(has_archive)
- with paperless_environment() as dirs:
+ with paperless_environment():
self.assertEqual(Document.objects.count(), 4)
Document.objects.all().delete()
self.assertEqual(Document.objects.count(), 0)
)
self.assertFalse(has_thumbnail)
- with paperless_environment() as dirs:
+ with paperless_environment():
self.assertEqual(Document.objects.count(), 4)
Document.objects.all().delete()
self.assertEqual(Document.objects.count(), 0)
has_document = has_document or element["model"] == "documents.document"
self.assertFalse(has_document)
- with paperless_environment() as dirs:
+ with paperless_environment():
self.assertEqual(Document.objects.count(), 4)
Document.objects.all().delete()
self.assertEqual(Document.objects.count(), 0)
os.path.join(self.dirs.media_dir, "documents"),
)
- manifest = self._do_export(use_folder_prefix=True)
+ self._do_export(use_folder_prefix=True)
- with paperless_environment() as dirs:
+ with paperless_environment():
self.assertEqual(Document.objects.count(), 4)
Document.objects.all().delete()
self.assertEqual(Document.objects.count(), 0)
from django.test import override_settings
from django.test import TestCase
-from .. import matching
-from ..models import Correspondent
-from ..models import Document
-from ..models import DocumentType
-from ..models import Tag
-from ..signals import document_consumption_finished
+from documents import matching
+from documents.models import Correspondent
+from documents.models import Document
+from documents.models import DocumentType
+from documents.models import Tag
+from documents.signals import document_consumption_finished
class _TestMatchingBase(TestCase):
def test_parser_missing(self):
Document = self.apps.get_model("documents", "Document")
- doc1 = make_test_document(
+ make_test_document(
Document,
"document",
"invalid/typesss768",
"document.png",
simple_pdf,
)
- doc2 = make_test_document(
+ make_test_document(
Document,
"document",
"invalid/typesss768",
Document = apps.get_model("documents", "Document")
- doc_unrelated = make_test_document(
+ make_test_document(
Document,
"unrelated",
"application/pdf",
simple_pdf2,
"unrelated.pdf",
)
- doc_no_archive = make_test_document(
+ make_test_document(
Document,
"no_archive",
"text/plain",
simple_txt,
"no_archive.txt",
)
- clashB = make_test_document(
+ make_test_document(
Document,
"clash",
"image/jpeg",
from django.test import TestCase
-from ..models import Correspondent
-from ..models import Document
+from documents.models import Correspondent
+from documents.models import Document
from .factories import CorrespondentFactory
from .factories import DocumentFactory
class CorrespondentTestCase(TestCase):
def test___str__(self):
- for s in ("test", "οχι", "test with fun_charÅc'\"terß"):
+ for s in ("test", "oχi", "test with fun_charÅc'\"terß"):
correspondent = CorrespondentFactory.create(name=s)
self.assertEqual(str(correspondent), s)
- No parser class is returned
"""
m.return_value = []
- with TemporaryDirectory() as tmpdir:
+ with TemporaryDirectory():
self.assertIsNone(get_parser_class_for_mime_type("application/pdf"))
@mock.patch("documents.parsers.document_consumer_declaration.send")
)
def test_orphaned_file(self):
- doc = self.make_test_data()
+ self.make_test_data()
Path(self.dirs.originals_dir, "orphaned").touch()
messages = check_sanity()
self.assertTrue(messages.has_warning)
import celery
from django.test import TestCase
from documents.data_models import ConsumableDocument
-from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource
from documents.models import PaperlessTask
from documents.signals.handlers import before_task_publish_handler
self.client.cookies.load(
{settings.LANGUAGE_COOKIE_NAME: language_given},
)
- elif settings.LANGUAGE_COOKIE_NAME in self.client.cookies.keys():
+ elif settings.LANGUAGE_COOKIE_NAME in self.client.cookies:
self.client.cookies.pop(settings.LANGUAGE_COOKIE_NAME)
response = self.client.get(
def get_serializer(self, *args, **kwargs):
super().get_serializer(*args, **kwargs)
fields_param = self.request.query_params.get("fields", None)
- if fields_param:
- fields = fields_param.split(",")
- else:
- fields = None
+ fields = fields_param.split(",") if fields_param else None
truncate_content = self.request.query_params.get("truncate_content", "False")
serializer_class = self.get_serializer_class()
kwargs.setdefault("context", self.get_serializer_context())
try:
doc = Document.objects.get(pk=pk)
except Document.DoesNotExist:
- raise Http404()
+ raise Http404
meta = {
"original_checksum": doc.checksum,
response = self.file_response(pk, request, "inline")
return response
except (FileNotFoundError, Document.DoesNotExist):
- raise Http404()
+ raise Http404
@action(methods=["get"], detail=True)
@method_decorator(cache_control(public=False, max_age=315360000))
return HttpResponse(handle, content_type="image/webp")
except (FileNotFoundError, Document.DoesNotExist):
- raise Http404()
+ raise Http404
@action(methods=["get"], detail=True)
def download(self, request, pk=None):
try:
return self.file_response(pk, request, "attachment")
except (FileNotFoundError, Document.DoesNotExist):
- raise Http404()
+ raise Http404
def getNotes(self, doc):
return [
try:
doc = Document.objects.get(pk=pk)
except Document.DoesNotExist:
- raise Http404()
+ raise Http404
currentUser = request.user
elif "more_like_id" in self.request.query_params:
query_class = index.DelayedMoreLikeThisQuery
else:
- raise ValueError()
+ raise ValueError
return query_class(
self.searcher,
def retrieve(self, request, pk=None, *args, **kwargs):
if pk not in self.log_files:
- raise Http404()
+ raise Http404
filename = self.get_log_filename(pk)
if not os.path.isfile(filename):
- raise Http404()
+ raise Http404
with open(filename) as f:
lines = [line.rstrip() for line in f.readlines()]
Error(
writeable_message.format(var),
writeable_hint.format(
- f"\n{dir_mode} {dir_owner} {dir_group} " f"{directory}\n",
+ f"\n{dir_mode} {dir_owner} {dir_group} {directory}\n",
),
),
)
try:
import zoneinfo
except ImportError: # pragma: nocover
- import backports.zoneinfo as zoneinfo
+ from backports import zoneinfo
msgs = []
if settings.TIME_ZONE not in zoneinfo.available_timezones():
msgs.append(
def connect(self):
if not self._authenticated():
- raise DenyConnection()
+ raise DenyConnection
else:
async_to_sync(self.channel_layer.group_add)(
"status_updates",
self.channel_name,
)
- raise AcceptConnection()
+ raise AcceptConnection
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(
if "user_permissions" in validated_data:
user_permissions = validated_data.pop("user_permissions")
password = None
- if "password" in validated_data:
- if len(validated_data.get("password").replace("*", "")) > 0:
- password = validated_data.pop("password")
+ if (
+ "password" in validated_data
+ and len(validated_data.get("password").replace("*", "")) > 0
+ ):
+ password = validated_data.pop("password")
user = User.objects.create(**validated_data)
# set groups
if groups:
"django_filters",
"django_celery_results",
"guardian",
-] + env_apps
+ *env_apps,
+]
if DEBUG:
INSTALLED_APPS.append("channels")
)
# X-Frame options for embedded PDF display:
-if DEBUG:
- X_FRAME_OPTIONS = "ANY"
-else:
- X_FRAME_OPTIONS = "SAMEORIGIN"
+X_FRAME_OPTIONS = "ANY" if DEBUG else "SAMEORIGIN"
# The next 3 settings can also be set using just PAPERLESS_URL
_paperless_uri = urlparse(_paperless_url)
CSRF_TRUSTED_ORIGINS.append(_paperless_url)
CORS_ALLOWED_ORIGINS.append(_paperless_url)
- if ALLOWED_HOSTS != ["*"]:
+ if ["*"] != ALLOWED_HOSTS:
ALLOWED_HOSTS.append(_paperless_uri.hostname)
else:
# always allow localhost. Necessary e.g. for healthcheck in docker.
if client_ip is None:
logger.info(
f"Login failed for user `{credentials['username']}`."
- + " Unable to determine IP address.",
+ " Unable to determine IP address.",
)
else:
if is_routable:
# We got the client's IP address
logger.info(
f"Login failed for user `{credentials['username']}`"
- + f" from IP `{client_ip}.`",
+ f" from IP `{client_ip}.`",
)
else:
# The client's IP address is private
logger.info(
f"Login failed for user `{credentials['username']}`"
- + f" from private IP `{client_ip}.`",
+ f" from private IP `{client_ip}.`",
)
include(
[
re_path(
- r"^auth/",
+ "^auth/",
include(
("rest_framework.urls", "rest_framework"),
namespace="rest_framework",
),
),
re_path(
- r"^search/autocomplete/",
+ "^search/autocomplete/",
SearchAutoCompleteView.as_view(),
name="autocomplete",
),
- re_path(r"^statistics/", StatisticsView.as_view(), name="statistics"),
+ re_path("^statistics/", StatisticsView.as_view(), name="statistics"),
re_path(
- r"^documents/post_document/",
+ "^documents/post_document/",
PostDocumentView.as_view(),
name="post_document",
),
re_path(
- r"^documents/bulk_edit/",
+ "^documents/bulk_edit/",
BulkEditView.as_view(),
name="bulk_edit",
),
re_path(
- r"^documents/selection_data/",
+ "^documents/selection_data/",
SelectionDataView.as_view(),
name="selection_data",
),
re_path(
- r"^documents/bulk_download/",
+ "^documents/bulk_download/",
BulkDownloadView.as_view(),
name="bulk_download",
),
re_path(
- r"^remote_version/",
+ "^remote_version/",
RemoteVersionView.as_view(),
name="remoteversion",
),
+ re_path("^ui_settings/", UiSettingsView.as_view(), name="ui_settings"),
re_path(
- r"^ui_settings/",
- UiSettingsView.as_view(),
- name="ui_settings",
- ),
- re_path(
- r"^acknowledge_tasks/",
+ "^acknowledge_tasks/",
AcknowledgeTasksView.as_view(),
name="acknowledge_tasks",
),
re_path(
- r"^mail_accounts/test/",
+ "^mail_accounts/test/",
MailAccountTestView.as_view(),
name="mail_accounts_test",
),
path("token/", views.obtain_auth_token),
- ]
- + api_router.urls,
+ *api_router.urls,
+ ],
),
),
re_path(r"^favicon.ico$", FaviconView.as_view(), name="favicon"),
widgets = {
"password": forms.PasswordInput(),
}
- fields = "__all__"
+ fields = [
+ "name",
+ "imap_server",
+ "username",
+ "imap_security",
+ "username",
+ "password",
+ "is_token",
+ "character_set",
+ ]
class MailAccountAdmin(admin.ModelAdmin):
fieldsets = [
(None, {"fields": ["name", "imap_server", "imap_port"]}),
- (_("Authentication"), {"fields": ["imap_security", "username", "password"]}),
+ (
+ _("Authentication"),
+ {"fields": ["imap_security", "username", "password", "is_token"]},
+ ),
(_("Advanced settings"), {"fields": ["character_set"]}),
]
form = MailAccountAdminForm
"""
Perform mail action on the given mail uid in the mailbox.
"""
- raise NotImplementedError()
+ raise NotImplementedError
class DeleteMailAction(BaseMailAction):
_, self.color = parameter.split(":")
self.color = self.color.strip()
- if not self.color.lower() in APPLE_MAIL_TAG_COLORS.keys():
+ if self.color.lower() not in APPLE_MAIL_TAG_COLORS.keys():
raise MailError("Not a valid AppleMail tag color.")
self.keyword = None
status="SUCCESS",
)
- except Exception as e:
+ except Exception:
ProcessedMail.objects.create(
owner=rule.owner,
rule=rule,
status="FAILED",
error=traceback.format_exc(),
)
- raise e
+ raise
@shared_task
self.log(
"debug",
- f"Rule {rule}: Searching folder with criteria " f"{str(criterias)}",
+ f"Rule {rule}: Searching folder with criteria {str(criterias)}",
)
try:
except Exception as e:
self.log(
"error",
- f"Rule {rule}: Error while processing mail " f"{message.uid}: {e}",
+ f"Rule {rule}: Error while processing mail {message.uid}: {e}",
exc_info=True,
)
for att in message.attachments:
if (
- not att.content_disposition == "attachment"
+ att.content_disposition != "attachment"
and rule.attachment_type
== MailRule.AttachmentProcessing.ATTACHMENTS_ONLY
):
)
continue
- if rule.filter_attachment_filename:
+ if rule.filter_attachment_filename and not fnmatch(
+ att.filename.lower(),
+ rule.filter_attachment_filename.lower(),
+ ):
# Force the filename and pattern to the lowercase
# as this is system dependent otherwise
- if not fnmatch(
- att.filename.lower(),
- rule.filter_attachment_filename.lower(),
- ):
- continue
+ continue
title = self._get_title(message, att, rule)
model_name="mailrule",
name="maximum_age",
field=models.PositiveIntegerField(
- default=30, help_text="Specified in days."
+ default=30,
+ help_text="Specified in days.",
),
),
]
model_name="mailrule",
name="filter_body",
field=models.CharField(
- blank=True, max_length=256, null=True, verbose_name="filter body"
+ blank=True,
+ max_length=256,
+ null=True,
+ verbose_name="filter body",
),
),
migrations.AlterField(
model_name="mailrule",
name="filter_from",
field=models.CharField(
- blank=True, max_length=256, null=True, verbose_name="filter from"
+ blank=True,
+ max_length=256,
+ null=True,
+ verbose_name="filter from",
),
),
migrations.AlterField(
model_name="mailrule",
name="filter_subject",
field=models.CharField(
- blank=True, max_length=256, null=True, verbose_name="filter subject"
+ blank=True,
+ max_length=256,
+ null=True,
+ verbose_name="filter subject",
),
),
migrations.AlterField(
model_name="mailrule",
name="folder",
field=models.CharField(
- default="INBOX", max_length=256, verbose_name="folder"
+ default="INBOX",
+ max_length=256,
+ verbose_name="folder",
),
),
migrations.AlterField(
model_name="mailrule",
name="maximum_age",
field=models.PositiveIntegerField(
- default=30, help_text="Specified in days.", verbose_name="maximum age"
+ default=30,
+ help_text="Specified in days.",
+ verbose_name="maximum age",
),
),
migrations.AlterField(
model_name="mailrule",
name="assign_tags",
field=models.ManyToManyField(
- blank=True, to="documents.Tag", verbose_name="assign this tag"
+ blank=True,
+ to="documents.Tag",
+ verbose_name="assign this tag",
),
),
]
(
"folder",
models.CharField(
- editable=False, max_length=256, verbose_name="folder"
+ editable=False,
+ max_length=256,
+ verbose_name="folder",
),
),
(
"uid",
models.CharField(
- editable=False, max_length=256, verbose_name="uid"
+ editable=False,
+ max_length=256,
+ verbose_name="uid",
),
),
(
"subject",
models.CharField(
- editable=False, max_length=256, verbose_name="subject"
+ editable=False,
+ max_length=256,
+ verbose_name="subject",
),
),
(
(
"status",
models.CharField(
- editable=False, max_length=256, verbose_name="status"
+ editable=False,
+ max_length=256,
+ verbose_name="status",
),
),
(
"error",
models.TextField(
- blank=True, editable=False, null=True, verbose_name="error"
+ blank=True,
+ editable=False,
+ null=True,
+ verbose_name="error",
),
),
(
model_name="mailrule",
name="filter_to",
field=models.CharField(
- blank=True, max_length=256, null=True, verbose_name="filter to"
+ blank=True,
+ max_length=256,
+ null=True,
+ verbose_name="filter to",
),
),
]
model_name="mailaccount",
name="is_token",
field=models.BooleanField(
- default=False, verbose_name="Is token authentication"
+ default=False,
+ verbose_name="Is token authentication",
),
),
]
class AttachmentProcessing(models.IntegerChoices):
ATTACHMENTS_ONLY = 1, _("Only process attachments.")
- EVERYTHING = 2, _("Process all files, including 'inline' " "attachments.")
+ EVERYTHING = 2, _("Process all files, including 'inline' attachments.")
class MailAction(models.IntegerChoices):
DELETE = 1, _("Delete")
except Exception as err:
raise ParseError(
f"Could not parse {document_path}: {err}",
- )
+ ) from err
if not self._parsed.from_values:
self._parsed = None
raise ParseError(
except ParseError as e:
self.log(
"warning",
- f"Error while fetching document metadata for " f"{document_path}: {e}",
+ f"Error while fetching document metadata for {document_path}: {e}",
)
return result
self.text += f"Attachments: {', '.join(att)}\n\n"
- if mail.html != "":
+ if mail.html:
self.text += "HTML content: " + strip_text(self.tika_parse(mail.html))
self.text += f"\n\n{strip_text(mail.text)}"
raise ParseError(
f"Could not parse content with tika server at "
f"{self.tika_server}: {err}",
- )
+ ) from err
if parsed["content"]:
return parsed["content"]
else:
pdf_collection.append(("1_mail.pdf", self.generate_pdf_from_mail(mail)))
- if mail.html == "":
+ if not mail.html:
with open(pdf_path, "wb") as file:
file.write(pdf_collection[0][1])
file.close()
response = requests.post(url_merge, files=files, headers=headers)
response.raise_for_status() # ensure we notice bad responses
except Exception as err:
- raise ParseError(f"Error while converting document to PDF: {err}")
+ raise ParseError(f"Error while converting document to PDF: {err}") from err
with open(pdf_path, "wb") as file:
file.write(response.content)
return text
data["subject"] = clean_html(mail.subject)
- if data["subject"] != "":
+ if data["subject"]:
data["subject_label"] = "Subject"
data["from"] = clean_html(mail.from_values.full)
- if data["from"] != "":
+ if data["from"]:
data["from_label"] = "From"
data["to"] = clean_html(", ".join(address.full for address in mail.to_values))
- if data["to"] != "":
+ if data["to"]:
data["to_label"] = "To"
data["cc"] = clean_html(", ".join(address.full for address in mail.cc_values))
- if data["cc"] != "":
+ if data["cc"]:
data["cc_label"] = "CC"
data["bcc"] = clean_html(", ".join(address.full for address in mail.bcc_values))
- if data["bcc"] != "":
+ if data["bcc"]:
data["bcc_label"] = "BCC"
att = []
for a in mail.attachments:
att.append(f"{a.filename} ({format_size(a.size, binary=True)})")
data["attachments"] = clean_html(", ".join(att))
- if data["attachments"] != "":
+ if data["attachments"]:
data["attachments_label"] = "Attachments"
data["date"] = clean_html(mail.date.astimezone().strftime("%Y-%m-%d %H:%M"))
)
response.raise_for_status() # ensure we notice bad responses
except Exception as err:
- raise ParseError(f"Error while converting document to PDF: {err}")
+ raise ParseError(
+ f"Error while converting document to PDF: {err}",
+ ) from err
return response.content
)
response.raise_for_status() # ensure we notice bad responses
except Exception as err:
- raise ParseError(f"Error while converting document to PDF: {err}")
+ raise ParseError(f"Error while converting document to PDF: {err}") from err
return response.content
]
def update(self, instance, validated_data):
- if "password" in validated_data:
- if len(validated_data.get("password").replace("*", "")) == 0:
- validated_data.pop("password")
+ if (
+ "password" in validated_data
+ and len(validated_data.get("password").replace("*", "")) == 0
+ ):
+ validated_data.pop("password")
super().update(instance, validated_data)
return instance
except MailError as e:
self.fail(f"Failure: {e}")
- except Exception as e:
+ except Exception:
pass
def test_process_non_gmail_server_tag(self):
except MailError as e:
self.fail(f"Failure: {e}")
- except Exception as e:
+ except Exception:
pass
from django.core.management import call_command
from django.db import DatabaseError
from django.test import TestCase
-from documents.data_models import ConsumableDocument
-from documents.data_models import DocumentMetadataOverrides
from documents.models import Correspondent
from documents.tests.utils import DirectoriesMixin
-from documents.tests.utils import DocumentConsumeDelayMixin
from documents.tests.utils import FileSystemAssertsMixin
from imap_tools import EmailAddress
from imap_tools import FolderInfo
)
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
else:
- raise Exception()
+ raise Exception
def fake_magic_from_buffer(buffer, mime=False):
with self.assertRaisesRegex(
MailError,
"Error while authenticating account",
- ) as context:
+ ):
self.mail_account_handler.handle_mail_account(account)
def test_error_skip_account(self):
try:
mailbox_login(M, account)
return Response({"success": True})
- except MailError as e:
+ except MailError:
logger.error(
- f"Mail account {account} test failed: {e}",
- exc_info=False,
+ f"Mail account {account} test failed",
)
return HttpResponseBadRequest("Unable to connect to server")
except Exception as e:
self.log(
"warning",
- f"Error while reading metadata {key}: {value}. Error: " f"{e}",
+ f"Error while reading metadata {key}: {value}. Error: {e}",
)
return result
return post_process_text(text)
except Exception:
- # TODO catch all for various issues with PDFminer.six.
# If pdftotext fails, fall back to OCR.
self.log(
"warning",
- "Error while getting text from PDF document with " "pdfminer.six",
+ "Error while getting text from PDF document with pdftotext",
exc_info=True,
)
# probably not a PDF file.
def parse(self, document_path: Path, mime_type, file_name=None):
# This forces tesseract to use one core per page.
os.environ["OMP_THREAD_LIMIT"] = "1"
+ VALID_TEXT_LENGTH = 50
if mime_type == "application/pdf":
text_original = self.extract_text(None, document_path)
- original_has_text = text_original is not None and len(text_original) > 50
+ original_has_text = (
+ text_original is not None and len(text_original) > VALID_TEXT_LENGTH
+ )
else:
text_original = None
original_has_text = False
class TestChecks(TestCase):
def test_default_language(self):
- msgs = check_default_language_available(None)
+ check_default_language_available(None)
@override_settings(OCR_LANGUAGE="")
def test_no_language(self):
except Exception as e:
self.log(
"warning",
- f"Error while fetching document metadata for " f"{document_path}: {e}",
+ f"Error while fetching document metadata for {document_path}: {e}",
)
return []
except Exception as e:
self.log(
"warning",
- f"Unable to extract date for document " f"{document_path}: {e}",
+ f"Unable to extract date for document {document_path}: {e}",
)
self.archive_path = self.convert_to_pdf(document_path, file_name)