exclude: "(^Pipfile\\.lock$)"
# Python hooks
- repo: https://github.com/asottile/reorder_python_imports
- rev: v3.0.1
+ rev: v3.1.0
hooks:
- id: reorder-python-imports
exclude: "(migrations)"
rev: 22.3.0
hooks:
- id: black
+ - repo: https://github.com/asottile/pyupgrade
+ rev: v2.32.1
+ hooks:
+ - id: pyupgrade
+ exclude: "(migrations)"
+ args:
+ - "--py38-plus"
# Dockerfile hooks
- repo: https://github.com/AleksaC/hadolint-py
rev: v2.10.0
## get traceback info
import threading, sys, traceback
- id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
+ id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""), threadId))
for o in queryset:
index.remove_document(writer, o)
- super(DocumentAdmin, self).delete_queryset(request, queryset)
+ super().delete_queryset(request, queryset)
def delete_model(self, request, obj):
from documents import index
index.remove_document_from_index(obj)
- super(DocumentAdmin, self).delete_model(request, obj)
+ super().delete_model(request, obj)
def save_model(self, request, obj, form, change):
from documents import index
index.add_or_update_document(obj)
- super(DocumentAdmin, self).save_model(request, obj, form, change)
+ super().save_model(request, obj, form, change)
class RuleInline(admin.TabularInline):
class ArchiveOnlyStrategy(BulkArchiveStrategy):
def __init__(self, zipf):
- super(ArchiveOnlyStrategy, self).__init__(zipf)
+ super().__init__(zipf)
def add_document(self, doc: Document):
if doc.has_archive_version:
return classifier
-class DocumentClassifier(object):
+class DocumentClassifier:
# v7 - Updated scikit-learn package version
FORMAT_VERSION = 7
labels_correspondent.append(y)
tags = sorted(
- [
- tag.pk
- for tag in doc.tags.filter(
- matching_algorithm=MatchingModel.MATCH_AUTO,
- )
- ],
+ tag.pk
+ for tag in doc.tags.filter(
+ matching_algorithm=MatchingModel.MATCH_AUTO,
+ )
)
for tag in tags:
m.update(tag.to_bytes(4, "little", signed=True))
if self.data_hash and new_data_hash == self.data_hash:
return False
- labels_tags_unique = set([tag for tags in labels_tags for tag in tags])
+ labels_tags_unique = {tag for tags in labels_tags for tag in tags}
num_tags = len(labels_tags_unique)
try:
self._send_progress(20, 100, "WORKING", MESSAGE_PARSING_DOCUMENT)
- self.log("debug", "Parsing {}...".format(self.filename))
+ self.log("debug", f"Parsing {self.filename}...")
document_parser.parse(self.path, mime_type, self.filename)
self.log("debug", f"Generating thumbnail for {self.filename}...")
document.save()
# Delete the file only if it was successfully consumed
- self.log("debug", "Deleting file {}".format(self.path))
+ self.log("debug", f"Deleting file {self.path}")
os.unlink(self.path)
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
)
if os.path.isfile(shadow_file):
- self.log("debug", "Deleting file {}".format(shadow_file))
+ self.log("debug", f"Deleting file {shadow_file}")
os.unlink(shadow_file)
except Exception as e:
self.run_post_consume_script(document)
- self.log("info", "Document {} consumption finished".format(document))
+ self.log("info", f"Document {document} consumption finished")
self._send_progress(100, 100, "SUCCESS", MESSAGE_FINISHED, document.id)
tags = defaultdictNoStr(lambda: slugify(None), many_to_dictionary(doc.tags))
tag_list = pathvalidate.sanitize_filename(
- ",".join(sorted([tag.name for tag in doc.tags.all()])),
+ ",".join(sorted(tag.name for tag in doc.tags.all())),
replacement_text="-",
)
class TagsFilter(Filter):
def __init__(self, exclude=False, in_list=False):
- super(TagsFilter, self).__init__()
+ super().__init__()
self.exclude = exclude
self.in_list = in_list
for document in encrypted_files:
- print("Decrypting {}".format(document).encode("utf-8"))
+ print(f"Decrypting {document}".encode())
old_paths = [document.source_path, document.thumbnail_path]
def find_fixtures(self, fixture_label):
if fixture_label == "-":
return [("-", None, "-")]
- return super(Command, self).find_fixtures(fixture_label)
+ return super().find_fixtures(fixture_label)
-# coding=utf-8
import datetime
import logging
import os
if self.filename:
fname = str(self.filename)
else:
- fname = "{:07}{}".format(self.pk, self.file_type)
+ fname = f"{self.pk:07}{self.file_type}"
if self.storage_type == self.STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
@property
def thumbnail_path(self):
- file_name = "{:07}.png".format(self.pk)
+ file_name = f"{self.pk:07}.png"
if self.storage_type == self.STORAGE_TYPE_GPG:
file_name += ".gpg"
@classmethod
def _get_created(cls, created):
try:
- return dateutil.parser.parse("{:0<14}Z".format(created[:-1]))
+ return dateutil.parser.parse(f"{created[:-1]:0<14}Z")
except ValueError:
return None
@classmethod
def _mangle_property(cls, properties, name):
if name in properties:
- properties[name] = getattr(cls, "_get_{}".format(name))(properties[name])
+ properties[name] = getattr(cls, f"_get_{name}")(properties[name])
@classmethod
def from_filename(cls, filename):
logger.debug("Execute: " + " ".join(args), extra={"group": logging_group})
if not subprocess.Popen(args, env=environment).wait() == 0:
- raise ParseError("Convert failed at {}".format(args))
+ raise ParseError(f"Convert failed at {args}")
def get_default_thumbnail():
cmd = [settings.GS_BINARY, "-q", "-sDEVICE=pngalpha", "-o", gs_out_path, in_path]
try:
if not subprocess.Popen(cmd).wait() == 0:
- raise ParseError("Thumbnail (gs) failed at {}".format(cmd))
+ raise ParseError(f"Thumbnail (gs) failed at {cmd}")
# then run convert on the output from gs
run_convert(
density=300,
strip=True,
trim=False,
auto_orient=True,
- input_file="{}[0]".format(in_path),
+ input_file=f"{in_path}[0]",
output_file=out_path,
logging_group=logging_group,
)
self.log("debug", f"Execute: {' '.join(args)}")
if not subprocess.Popen(args).wait() == 0:
- raise ParseError("Optipng failed at {}".format(args))
+ raise ParseError(f"Optipng failed at {args}")
return out_path
else:
fields = kwargs.pop("fields", None)
# Instantiate the superclass normally
- super(DynamicFieldsModelSerializer, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
if fields is not None:
# Drop any fields that are not specified in the `fields` argument.
rules_data = validated_data.pop("filter_rules")
else:
rules_data = None
- super(SavedViewSerializer, self).update(instance, validated_data)
+ super().update(instance, validated_data)
if rules_data is not None:
SavedViewFilterRule.objects.filter(saved_view=instance).delete()
for rule_data in rules_data:
try:
if classifier.train():
logger.info(
- "Saving updated classifier model to {}...".format(settings.MODEL_FILE),
+ f"Saving updated classifier model to {settings.MODEL_FILE}...",
)
classifier.save()
else:
for n, page in enumerate(pdf.pages):
if n < pages_to_split_on[0]:
dst.pages.append(page)
- output_filename = "{}_document_0.pdf".format(fname)
+ output_filename = f"{fname}_document_0.pdf"
savepath = os.path.join(tempdir, output_filename)
with open(savepath, "wb") as out:
dst.save(out)
f"page_number: {str(page_number)} next_page: {str(next_page)}",
)
dst.pages.append(pdf.pages[page])
- output_filename = "{}_document_{}.pdf".format(fname, str(count + 1))
+ output_filename = f"{fname}_document_{str(count + 1)}.pdf"
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
savepath = os.path.join(tempdir, output_filename)
with open(savepath, "wb") as out:
# if we got here, the document was successfully split
# and can safely be deleted
if converted_tiff:
- logger.debug("Deleting file {}".format(file_to_process))
+ logger.debug(f"Deleting file {file_to_process}")
os.unlink(file_to_process)
- logger.debug("Deleting file {}".format(path))
+ logger.debug(f"Deleting file {path}")
os.unlink(path)
# notify the sender, otherwise the progress bar
# in the UI stays stuck
)
if document:
- return "Success. New document id {} created".format(document.pk)
+ return f"Success. New document id {document.pk} created"
else:
raise ConsumerError(
"Unknown error: Returned document was null, but "
return searcher.document(id=doc.id)
def setUp(self) -> None:
- super(TestDocumentAdmin, self).setUp()
+ super().setUp()
self.doc_admin = DocumentAdmin(model=Document, admin_site=AdminSite())
def test_save_model(self):
class TestDocumentApi(DirectoriesMixin, APITestCase):
def setUp(self):
- super(TestDocumentApi, self).setUp()
+ super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_login(user=self.user)
returned_doc["title"] = "the new title"
response = self.client.put(
- "/api/documents/{}/".format(doc.pk),
+ f"/api/documents/{doc.pk}/",
returned_doc,
format="json",
)
self.assertEqual(doc_after_save.correspondent, c2)
self.assertEqual(doc_after_save.title, "the new title")
- self.client.delete("/api/documents/{}/".format(doc_after_save.pk))
+ self.client.delete(f"/api/documents/{doc_after_save.pk}/")
self.assertEqual(len(Document.objects.all()), 0)
)
with open(
- os.path.join(self.dirs.thumbnail_dir, "{:07d}.png".format(doc.pk)),
+ os.path.join(self.dirs.thumbnail_dir, f"{doc.pk:07d}.png"),
"wb",
) as f:
f.write(content_thumbnail)
- response = self.client.get("/api/documents/{}/download/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/download/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content)
- response = self.client.get("/api/documents/{}/preview/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/preview/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content)
- response = self.client.get("/api/documents/{}/thumb/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/thumb/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content_thumbnail)
with open(doc.archive_path, "wb") as f:
f.write(content_archive)
- response = self.client.get("/api/documents/{}/download/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/download/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content_archive)
response = self.client.get(
- "/api/documents/{}/download/?original=true".format(doc.pk),
+ f"/api/documents/{doc.pk}/download/?original=true",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content)
- response = self.client.get("/api/documents/{}/preview/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/preview/")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, content_archive)
response = self.client.get(
- "/api/documents/{}/preview/?original=true".format(doc.pk),
+ f"/api/documents/{doc.pk}/preview/?original=true",
)
self.assertEqual(response.status_code, 200)
mime_type="application/pdf",
)
- response = self.client.get("/api/documents/{}/download/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/download/")
self.assertEqual(response.status_code, 404)
- response = self.client.get("/api/documents/{}/preview/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/preview/")
self.assertEqual(response.status_code, 404)
- response = self.client.get("/api/documents/{}/thumb/".format(doc.pk))
+ response = self.client.get(f"/api/documents/{doc.pk}/thumb/")
self.assertEqual(response.status_code, 404)
def test_document_filters(self):
self.assertCountEqual([results[0]["id"], results[1]["id"]], [doc2.id, doc3.id])
response = self.client.get(
- "/api/documents/?tags__id__in={},{}".format(tag_inbox.id, tag_3.id),
+ f"/api/documents/?tags__id__in={tag_inbox.id},{tag_3.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertCountEqual([results[0]["id"], results[1]["id"]], [doc1.id, doc3.id])
response = self.client.get(
- "/api/documents/?tags__id__in={},{}".format(tag_2.id, tag_3.id),
+ f"/api/documents/?tags__id__in={tag_2.id},{tag_3.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertCountEqual([results[0]["id"], results[1]["id"]], [doc2.id, doc3.id])
response = self.client.get(
- "/api/documents/?tags__id__all={},{}".format(tag_2.id, tag_3.id),
+ f"/api/documents/?tags__id__all={tag_2.id},{tag_3.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertEqual(results[0]["id"], doc3.id)
response = self.client.get(
- "/api/documents/?tags__id__all={},{}".format(tag_inbox.id, tag_3.id),
+ f"/api/documents/?tags__id__all={tag_inbox.id},{tag_3.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertEqual(len(results), 0)
response = self.client.get(
- "/api/documents/?tags__id__all={}a{}".format(tag_inbox.id, tag_3.id),
+ f"/api/documents/?tags__id__all={tag_inbox.id}a{tag_3.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertEqual(len(results), 3)
- response = self.client.get("/api/documents/?tags__id__none={}".format(tag_3.id))
+ response = self.client.get(f"/api/documents/?tags__id__none={tag_3.id}")
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertEqual(len(results), 2)
self.assertCountEqual([results[0]["id"], results[1]["id"]], [doc1.id, doc2.id])
response = self.client.get(
- "/api/documents/?tags__id__none={},{}".format(tag_3.id, tag_2.id),
+ f"/api/documents/?tags__id__none={tag_3.id},{tag_2.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
self.assertEqual(results[0]["id"], doc1.id)
response = self.client.get(
- "/api/documents/?tags__id__none={},{}".format(tag_2.id, tag_inbox.id),
+ f"/api/documents/?tags__id__none={tag_2.id},{tag_inbox.id}",
)
self.assertEqual(response.status_code, 200)
results = response.data["results"]
class TestDocumentApiV2(DirectoriesMixin, APITestCase):
def setUp(self):
- super(TestDocumentApiV2, self).setUp()
+ super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
class TestBulkEdit(DirectoriesMixin, APITestCase):
def setUp(self):
- super(TestBulkEdit, self).setUp()
+ super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_login(user=user)
class TestBulkDownload(DirectoriesMixin, APITestCase):
def setUp(self):
- super(TestBulkDownload, self).setUp()
+ super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_login(user=user)
class TestClassifier(DirectoriesMixin, TestCase):
def setUp(self):
- super(TestClassifier, self).setUp()
+ super().setUp()
self.classifier = DocumentClassifier()
def generate_test_data(self):
self.assertEqual(file_info.title, title, filename)
- self.assertEqual(tuple([t.name for t in file_info.tags]), tags, filename)
+ self.assertEqual(tuple(t.name for t in file_info.tags), tags, filename)
def test_guess_attributes_from_name_when_title_starts_with_dash(self):
self._test_guess_attributes_from_name(
raise NotImplementedError()
def __init__(self, logging_group, scratch_dir, archive_path):
- super(DummyParser, self).__init__(logging_group, None)
+ super().__init__(logging_group, None)
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir)
self.archive_path = archive_path
return self.fake_thumb
def __init__(self, logging_group, progress_callback=None):
- super(CopyParser, self).__init__(logging_group, progress_callback)
+ super().__init__(logging_group, progress_callback)
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=self.tempdir)
def parse(self, document_path, mime_type, file_name=None):
raise NotImplementedError()
def __init__(self, logging_group, scratch_dir):
- super(FaultyParser, self).__init__(logging_group)
+ super().__init__(logging_group)
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir)
def get_optimised_thumbnail(self, document_path, mime_type, file_name=None):
return FaultyParser(logging_group, self.dirs.scratch_dir)
def setUp(self):
- super(TestConsumer, self).setUp()
+ super().setUp()
patcher = mock.patch("documents.parsers.document_consumer_declaration.send")
m = patcher.start()
os.path.dirname(__file__),
"../../paperless_tesseract/tests/samples",
)
- SCRATCH = "/tmp/paperless-tests-{}".format(str(uuid4())[:8])
+ SCRATCH = f"/tmp/paperless-tests-{str(uuid4())[:8]}"
def setUp(self):
os.makedirs(self.SCRATCH, exist_ok=True)
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
- self.assertEqual(generate_filename(document), "{:07d}.pdf".format(document.pk))
+ self.assertEqual(generate_filename(document), f"{document.pk:07d}.pdf")
document.storage_type = Document.STORAGE_TYPE_GPG
self.assertEqual(
generate_filename(document),
- "{:07d}.pdf.gpg".format(document.pk),
+ f"{document.pk:07d}.pdf.gpg",
)
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{correspondent}")
# Test default source_path
self.assertEqual(
document.source_path,
- settings.ORIGINALS_DIR + "/{:07d}.pdf".format(document.pk),
+ settings.ORIGINALS_DIR + f"/{document.pk:07d}.pdf",
)
document.filename = generate_filename(document)
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
def setUp(self) -> None:
- super(ConsumerMixin, self).setUp()
+ super().setUp()
self.t = None
patcher = mock.patch(
"documents.management.commands.document_consumer.async_task",
# wait for the consumer to exit.
self.t.join()
- super(ConsumerMixin, self).tearDown()
+ super().tearDown()
def wait_for_task_mock_call(self, excpeted_call_count=1):
n = 0
self.d1.correspondent = self.c1
self.d1.document_type = self.dt1
self.d1.save()
- super(TestExportImport, self).setUp()
+ super().setUp()
def _get_document_from_manifest(self, manifest, id):
f = list(
)
def setUp(self) -> None:
- super(TestRetagger, self).setUp()
+ super().setUp()
self.make_models()
def test_add_tags(self):
)
def setUp(self) -> None:
- super(TestMakeThumbnails, self).setUp()
+ super().setUp()
self.make_models()
def test_process_document(self):
doc = Document(content=string)
self.assertTrue(
matching.matches(instance, doc),
- '"%s" should match "%s" but it does not' % (match_text, string),
+ f'"{match_text}" should match "{string}" but it does not',
)
for string in no_match:
doc = Document(content=string)
self.assertFalse(
matching.matches(instance, doc),
- '"%s" should not match "%s" but it does' % (match_text, string),
+ f'"{match_text}" should not match "{string}" but it does',
)
if self.filename:
fname = archive_name_from_filename(self.filename)
else:
- fname = "{:07}.pdf".format(self.pk)
+ fname = f"{self.pk:07}.pdf"
return os.path.join(settings.ARCHIVE_DIR, fname)
if doc.filename:
fname = str(doc.filename)
else:
- fname = "{:07}{}".format(doc.pk, doc.file_type)
+ fname = f"{doc.pk:07}{doc.file_type}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
def thumbnail_path(doc):
- file_name = "{:07}.png".format(doc.pk)
+ file_name = f"{doc.pk:07}.png"
if doc.storage_type == STORAGE_TYPE_GPG:
file_name += ".gpg"
if self.filename:
fname = str(self.filename)
else:
- fname = "{:07}.{}".format(self.pk, self.file_type)
+ fname = f"{self.pk:07}.{self.file_type}"
if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg"
if doc.filename:
fname = str(doc.filename)
else:
- fname = "{:07}{}".format(doc.pk, file_type_after(doc))
+ fname = f"{doc.pk:07}{file_type_after(doc)}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
class TestParserDiscovery(TestCase):
@mock.patch("documents.parsers.document_consumer_declaration.send")
def test__get_parser_class_1_parser(self, m, *args):
- class DummyParser(object):
+ class DummyParser:
pass
m.return_value = (
@mock.patch("documents.parsers.document_consumer_declaration.send")
def test__get_parser_class_n_parsers(self, m, *args):
- class DummyParser1(object):
+ class DummyParser1:
pass
- class DummyParser2(object):
+ class DummyParser2:
pass
m.return_value = (
def setUp(self) -> None:
self.dirs = setup_directories()
- super(DirectoriesMixin, self).setUp()
+ super().setUp()
def tearDown(self) -> None:
- super(DirectoriesMixin, self).tearDown()
+ super().tearDown()
remove_dirs(self.dirs)
auto_migrate = True
def setUp(self):
- super(TestMigrations, self).setUp()
+ super().setUp()
assert (
self.migrate_from and self.migrate_to
return serializer_class(*args, **kwargs)
def update(self, request, *args, **kwargs):
- response = super(DocumentViewSet, self).update(request, *args, **kwargs)
+ response = super().update(request, *args, **kwargs)
from documents import index
index.add_or_update_document(self.get_object())
from documents import index
index.remove_document_from_index(self.get_object())
- return super(DocumentViewSet, self).destroy(request, *args, **kwargs)
+ return super().destroy(request, *args, **kwargs)
@staticmethod
def original_requested(request):
class SearchResultSerializer(DocumentSerializer):
def to_representation(self, instance):
doc = Document.objects.get(id=instance["id"])
- r = super(SearchResultSerializer, self).to_representation(doc)
+ r = super().to_representation(doc)
r["__search_hit__"] = {
"score": instance.score,
"highlights": instance.highlights("content", text=doc.content)
class UnifiedSearchViewSet(DocumentViewSet):
def __init__(self, *args, **kwargs):
- super(UnifiedSearchViewSet, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
self.searcher = None
def get_serializer_class(self):
self.paginator.get_page_size(self.request),
)
else:
- return super(UnifiedSearchViewSet, self).filter_queryset(queryset)
+ return super().filter_queryset(queryset)
def list(self, request, *args, **kwargs):
if self._is_search_request():
try:
with index.open_index_searcher() as s:
self.searcher = s
- return super(UnifiedSearchViewSet, self).list(request)
+ return super().list(request)
except NotFound:
raise
except Exception as e:
return HttpResponseBadRequest(str(e))
else:
- return super(UnifiedSearchViewSet, self).list(request)
+ return super().list(request)
class LogViewSet(ViewSet):
if not os.path.isfile(filename):
raise Http404()
- with open(filename, "r") as f:
+ with open(filename) as f:
lines = [line.rstrip() for line in f.readlines()]
return Response(lines)
and request.headers["Referer"].startswith("http://localhost:4200/")
):
user = User.objects.filter(is_staff=True).first()
- print("Auto-Login with user {}".format(user))
+ print(f"Auto-Login with user {user}")
return (user, None)
else:
return None
@dataclasses.dataclass
-class _AttachmentDef(object):
+class _AttachmentDef:
filename: str = "a_file.pdf"
maintype: str = "application/pdf"
subtype: str = "pdf"
self.current_folder = new_folder
-class BogusClient(object):
+class BogusClient:
def authenticate(self, mechanism, authobject):
# authobject must be a callable object
auth_bytes = authobject(None)
self.reset_bogus_mailbox()
self.mail_account_handler = MailAccountHandler()
- super(TestMail, self).setUp()
+ super().setUp()
def reset_bogus_mailbox(self):
self.bogus_mailbox.messages = []
self.assertEqual(result, len(matches), f"Error with pattern: {pattern}")
filenames = sorted(
- [a[1]["override_filename"] for a in self.async_task.call_args_list],
+ a[1]["override_filename"] for a in self.async_task.call_args_list
)
self.assertListEqual(filenames, matches)
def extract_text(self, sidecar_file, pdf_file):
if sidecar_file and os.path.isfile(sidecar_file):
- with open(sidecar_file, "r") as f:
+ with open(sidecar_file) as f:
text = f.read()
if "[OCR skipped on page" not in text:
def get_thumbnail(self, document_path, mime_type, file_name=None):
def read_text():
- with open(document_path, "r") as src:
+ with open(document_path) as src:
lines = [line.strip() for line in src.readlines()]
text = "\n".join(lines[:50])
return text
return out_path
def parse(self, document_path, mime_type, file_name=None):
- with open(document_path, "r") as f:
+ with open(document_path) as f:
self.text = f.read()