"RUF", # https://docs.astral.sh/ruff/rules/#ruff-specific-rules-ruf
"FLY", # https://docs.astral.sh/ruff/rules/#flynt-fly
"PTH", # https://docs.astral.sh/ruff/rules/#flake8-use-pathlib-pth
+ "FBT", # https://docs.astral.sh/ruff/rules/#flake8-boolean-trap-fbt
]
ignore = ["DJ001", "SIM105", "RUF012"]
class BulkArchiveStrategy:
- def __init__(self, zipf: ZipFile, follow_formatting: bool = False) -> None:
+ def __init__(self, zipf: ZipFile, *, follow_formatting: bool = False) -> None:
self.zipf: ZipFile = zipf
if follow_formatting:
self.make_unique_filename: Callable[..., Path | str] = (
def _filename_only(
self,
doc: Document,
+ *,
archive: bool = False,
folder: str = "",
) -> str:
"""
counter = 0
while True:
- filename: str = folder + doc.get_public_filename(archive, counter)
+ filename: str = folder + doc.get_public_filename(
+ archive=archive,
+ counter=counter,
+ )
if filename in self.zipf.namelist():
counter += 1
else:
def _formatted_filepath(
self,
doc: Document,
+ *,
archive: bool = False,
folder: str = "",
) -> Path:
def set_permissions(
doc_ids: list[int],
set_permissions,
+ *,
owner=None,
merge=False,
) -> Literal["OK"]:
def merge(
doc_ids: list[int],
+ *,
metadata_document_id: int | None = None,
delete_originals: bool = False,
user: User | None = None,
def split(
doc_ids: list[int],
pages: list[list[int]],
+ *,
delete_originals: bool = False,
user: User | None = None,
) -> Literal["OK"]:
directory = os.path.normpath(os.path.dirname(directory))
-def generate_unique_filename(doc, archive_filename=False):
+def generate_unique_filename(doc, *, archive_filename=False):
"""
Generates a unique filename for doc in settings.ORIGINALS_DIR.
while True:
new_filename = generate_filename(
doc,
- counter,
+ counter=counter,
archive_filename=archive_filename,
)
if new_filename == old_filename:
def generate_filename(
doc: Document,
+ *,
counter=0,
append_gpg=True,
archive_filename=False,
class ObjectFilter(Filter):
- def __init__(self, exclude=False, in_list=False, field_name=""):
+ def __init__(self, *, exclude=False, in_list=False, field_name=""):
super().__init__()
self.exclude = exclude
self.in_list = in_list
)
-def open_index(recreate=False) -> FileIndex:
+def open_index(*, recreate=False) -> FileIndex:
try:
if exists_in(settings.INDEX_DIR) and not recreate:
return open_dir(settings.INDEX_DIR, schema=get_schema())
@contextmanager
-def open_index_writer(optimize=False) -> AsyncWriter:
+def open_index_writer(*, optimize=False) -> AsyncWriter:
writer = AsyncWriter(open_index())
try:
def get_permissions_criterias(user: User | None = None) -> list:
- user_criterias = [query.Term("has_owner", False)]
+ user_criterias = [query.Term("has_owner", text=False)]
if user is not None:
if user.is_superuser: # superusers see all docs
user_criterias = []
# This code is taken almost entirely from https://github.com/wagtail/wagtail/pull/11912 with all credit to the original author.
help = "Converts UUID columns from char type to the native UUID type used in MariaDB 10.7+ and Django 5.0+."
- def convert_field(self, model, field_name, null=False):
+ def convert_field(self, model, field_name, *, null=False):
if model._meta.get_field(field_name).model != model: # pragma: no cover
# Field is inherited from a parent model
return
return
if settings.CONSUMER_POLLING == 0 and INotify:
- self.handle_inotify(directory, recursive, options["testing"])
+ self.handle_inotify(directory, recursive, is_testing=options["testing"])
else:
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
logger.warning("Using polling as INotify import failed")
- self.handle_polling(directory, recursive, options["testing"])
+ self.handle_polling(directory, recursive, is_testing=options["testing"])
logger.debug("Consumer exiting.")
- def handle_polling(self, directory, recursive, is_testing: bool):
+ def handle_polling(self, directory, recursive, *, is_testing: bool):
logger.info(f"Polling directory for changes: {directory}")
timeout = None
observer.stop()
observer.join()
- def handle_inotify(self, directory, recursive, is_testing: bool):
+ def handle_inotify(self, directory, recursive, *, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}")
timeout_ms = None
return os.path.join(settings.ORIGINALS_DIR, fname)
-def generate_unique_filename(doc, archive_filename=False):
+def generate_unique_filename(doc, *, archive_filename=False):
if archive_filename:
old_filename = doc.archive_filename
root = settings.ARCHIVE_DIR
while True:
new_filename = generate_filename(
doc,
- counter,
+ counter=counter,
archive_filename=archive_filename,
)
if new_filename == old_filename:
return new_filename
-def generate_filename(doc, counter=0, append_gpg=True, archive_filename=False):
+def generate_filename(doc, *, counter=0, append_gpg=True, archive_filename=False):
path = ""
try:
def archive_file(self):
return open(self.archive_path, "rb")
- def get_public_filename(self, archive=False, counter=0, suffix=None) -> str:
+ def get_public_filename(self, *, archive=False, counter=0, suffix=None) -> str:
"""
Returns a sanitized filename for the document, not including any paths.
"""
def run_convert(
input_file,
output_file,
+ *,
density=None,
scale=None,
alpha=None,
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
-def set_permissions_for_object(permissions: list[str], object, merge: bool = False):
+def set_permissions_for_object(permissions: list[str], object, *, merge: bool = False):
"""
Set permissions for an object. The permissions are given as a list of strings
in the format "action_modelname", e.g. "view_document".
pass
-def check_sanity(progress=False) -> SanityCheckMessages:
+def check_sanity(*, progress=False) -> SanityCheckMessages:
messages = SanityCheckMessages()
present_files = {
def set_correspondent(
sender,
document: Document,
+ *,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace=False,
def set_document_type(
sender,
document: Document,
+ *,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace=False,
def set_tags(
sender,
document: Document,
+ *,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace=False,
def set_storage_path(
sender,
document: Document,
+ *,
logging_group=None,
classifier: DocumentClassifier | None = None,
replace=False,
writer.commit(optimize=True)
-def index_reindex(progress_bar_disable=False):
+def index_reindex(*, progress_bar_disable=False):
documents = Document.objects.all()
ix = index.open_index(recreate=True)
self,
query: list,
reference_predicate: Callable[[DocumentWrapper], bool],
+ *,
match_nothing_ok=False,
):
"""
metadata_document_id = self.doc1.id
user = User.objects.create(username="test_user")
- result = bulk_edit.merge(doc_ids, None, False, user)
+ result = bulk_edit.merge(
+ doc_ids,
+ metadata_document_id=None,
+ delete_originals=False,
+ user=user,
+ )
expected_filename = (
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
doc_ids = [self.doc2.id]
pages = [[1, 2], [3]]
user = User.objects.create(username="test_user")
- result = bulk_edit.split(doc_ids, pages, False, user)
+ result = bulk_edit.split(doc_ids, pages, delete_originals=False, user=user)
self.assertEqual(mock_consume_file.call_count, 2)
consume_file_args, _ = mock_consume_file.call_args
self.assertEqual(consume_file_args[1].title, "B (split 2)")
raise Exception("Generic exception.")
-def fake_magic_from_file(file, mime=False):
+def fake_magic_from_file(file, *, mime=False):
if mime:
if file.name.startswith("invalid_pdf"):
return "application/octet-stream"
super().setUp()
# all tests run without permission criteria, so has_no_owner query will always
# be appended.
- self.has_no_owner = query.Or([query.Term("has_owner", False)])
+ self.has_no_owner = query.Or([query.Term("has_owner", text=False)])
def _get_testset__id__in(self, param, field):
return (
def test_get_permission_criteria(self):
# tests contains tuples of user instances and the expected filter
tests = (
- (None, [query.Term("has_owner", False)]),
+ (None, [query.Term("has_owner", text=False)]),
(User(42, username="foo", is_superuser=True), []),
(
User(42, username="foo", is_superuser=False),
[
- query.Term("has_owner", False),
+ query.Term("has_owner", text=False),
query.Term("owner_id", 42),
query.Term("viewer_id", "42"),
],
else:
print("Consumed a perfectly valid file.") # noqa: T201
- def slow_write_file(self, target, incomplete=False):
+ def slow_write_file(self, target, *, incomplete=False):
with open(self.sample_file, "rb") as f:
pdf_bytes = f.read()
return manifest
- def test_exporter(self, use_filename_format=False):
+ def test_exporter(self, *, use_filename_format=False):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
match_algorithm: str,
should_match: Iterable[str],
no_match: Iterable[str],
+ *,
case_sensitive: bool = False,
):
for klass in (Tag, Correspondent, DocumentType):
strategy_class = ArchiveOnlyStrategy
with zipfile.ZipFile(temp.name, "w", compression) as zipf:
- strategy = strategy_class(zipf, follow_filename_format)
+ strategy = strategy_class(zipf, follow_formatting=follow_filename_format)
for document in documents:
strategy.add_document(document)
)
-def serve_file(doc: Document, use_archive: bool, disposition: str):
+def serve_file(*, doc: Document, use_archive: bool, disposition: str):
if use_archive:
file_handle = doc.archive_file
filename = doc.get_public_filename(archive=True)
).first()
if authenticator is not None:
delete_and_cleanup(request, authenticator)
- return Response(True)
+ return Response(data=True)
else:
return HttpResponseNotFound("TOTP not found")
).first()
if authenticator is not None:
delete_and_cleanup(request, authenticator)
- return Response(True)
+ return Response(data=True)
else:
return HttpResponseNotFound("TOTP not found")
return {"seen": False}
def post_consume(self, M: MailBox, message_uid: str, parameter: str):
- M.flag(message_uid, [MailMessageFlags.SEEN], True)
+ M.flag(message_uid, [MailMessageFlags.SEEN], value=True)
class MoveMailAction(BaseMailAction):
return {"flagged": False}
def post_consume(self, M: MailBox, message_uid: str, parameter: str):
- M.flag(message_uid, [MailMessageFlags.FLAGGED], True)
+ M.flag(message_uid, [MailMessageFlags.FLAGGED], value=True)
class TagMailAction(BaseMailAction):
A mail action that tags mails after processing.
"""
- def __init__(self, parameter: str, supports_gmail_labels: bool):
+ def __init__(self, parameter: str, *, supports_gmail_labels: bool):
# The custom tag should look like "apple:<color>"
if "apple:" in parameter.lower():
_, self.color = parameter.split(":")
M.flag(
message_uid,
set(itertools.chain(*APPLE_MAIL_TAG_COLORS.values())),
- False,
+ value=False,
)
# Set new $MailFlagBits
- M.flag(message_uid, APPLE_MAIL_TAG_COLORS.get(self.color), True)
+ M.flag(message_uid, APPLE_MAIL_TAG_COLORS.get(self.color), value=True)
# Set the general \Flagged
# This defaults to the "red" flag in AppleMail and
# "stars" in Thunderbird or GMail
- M.flag(message_uid, [MailMessageFlags.FLAGGED], True)
+ M.flag(message_uid, [MailMessageFlags.FLAGGED], value=True)
elif self.keyword:
- M.flag(message_uid, [self.keyword], True)
+ M.flag(message_uid, [self.keyword], value=True)
else:
raise MailError("No keyword specified.")
mailbox_login(M, account)
M.folder.set(rule.folder)
- action = get_rule_action(rule, supports_gmail_labels)
+ action = get_rule_action(rule, supports_gmail_labels=supports_gmail_labels)
try:
action.post_consume(M, message_uid, rule.action_parameter)
except errors.ImapToolsError:
).delay()
-def get_rule_action(rule: MailRule, supports_gmail_labels: bool) -> BaseMailAction:
+def get_rule_action(rule: MailRule, *, supports_gmail_labels: bool) -> BaseMailAction:
"""
Returns a BaseMailAction instance for the given rule.
"""
elif rule.action == MailRule.MailAction.MARK_READ:
return MarkReadMailAction()
elif rule.action == MailRule.MailAction.TAG:
- return TagMailAction(rule.action_parameter, supports_gmail_labels)
+ return TagMailAction(
+ rule.action_parameter,
+ supports_gmail_labels=supports_gmail_labels,
+ )
else:
raise NotImplementedError("Unknown action.") # pragma: no cover
-def make_criterias(rule: MailRule, supports_gmail_labels: bool):
+def make_criterias(rule: MailRule, *, supports_gmail_labels: bool):
"""
Returns criteria to be applied to MailBox.fetch for the given rule.
"""
if rule.filter_body:
criterias["body"] = rule.filter_body
- rule_query = get_rule_action(rule, supports_gmail_labels).get_criteria()
+ rule_query = get_rule_action(
+ rule,
+ supports_gmail_labels=supports_gmail_labels,
+ ).get_criteria()
if isinstance(rule_query, dict):
if len(rule_query) or len(criterias):
return AND(**rule_query, **criterias)
total_processed_files += self._handle_mail_rule(
M,
rule,
- supports_gmail_labels,
+ supports_gmail_labels=supports_gmail_labels,
)
except Exception as e:
self.log.exception(
self,
M: MailBox,
rule: MailRule,
+ *,
supports_gmail_labels: bool,
):
folders = [rule.folder]
f"does not exist in account {rule.account}",
) from err
- criterias = make_criterias(rule, supports_gmail_labels)
+ criterias = make_criterias(rule, supports_gmail_labels=supports_gmail_labels)
self.log.debug(
f"Rule {rule}: Searching folder with criteria {criterias}",
if username != self.USERNAME or access_token != self.ACCESS_TOKEN:
raise MailboxLoginError("BAD", "OK")
- def fetch(self, criteria, mark_seen, charset="", bulk=True):
+ def fetch(self, criteria, mark_seen, charset="", *, bulk=True):
msg = self.messages
criteria = str(criteria).strip("()").split(" ")
raise Exception
-def fake_magic_from_buffer(buffer, mime=False):
+def fake_magic_from_buffer(buffer, *, mime=False):
if mime:
if "PDF" in str(buffer):
return "application/pdf"
def create_message(
self,
+ *,
attachments: int | list[_AttachmentDef] = 1,
body: str = "",
subject: str = "the subject",
)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 0,
+ )
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_handle_mail_account_delete(self):
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", mark_seen=False)),
2,
)
self.mailMocker.apply_mail_actions()
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", mark_seen=False)),
1,
)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNKEYWORD processed", False)),
+ len(
+ self.mailMocker.bogus_mailbox.fetch(
+ "UNKEYWORD processed",
+ mark_seen=False,
+ ),
+ ),
2,
)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNKEYWORD processed", False)),
+ len(
+ self.mailMocker.bogus_mailbox.fetch(
+ "UNKEYWORD processed",
+ mark_seen=False,
+ ),
+ ),
0,
)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
criteria = NOT(gmail_label="processed")
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch(criteria, False)), 2)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch(criteria, mark_seen=False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch(criteria, False)), 0)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch(criteria, mark_seen=False)),
+ 0,
+ )
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_tag_mail_action_applemail_wrong_input(self):
MailError,
TagMailAction,
"apple:black",
- False,
+ supports_gmail_labels=False,
)
def test_handle_mail_account_tag_applemail(self):
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", mark_seen=False)),
2,
)
self.mailMocker.apply_mail_actions()
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", mark_seen=False)),
0,
)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
self.mailMocker.apply_mail_actions()
self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 2)
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 0,
+ )
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_auth_plain_fallback_fails_still(self):
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 0)
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
self.mailMocker.apply_mail_actions()
self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 2)
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 0,
+ )
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_disabled_rule(self):
self.mailMocker.apply_mail_actions()
self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
self.mailMocker.apply_mail_actions()
self.assertEqual(
- len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)),
+ len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", mark_seen=False)),
2,
) # still 2
mime_type,
output_file,
sidecar_file,
+ *,
safe_fallback=False,
):
if TYPE_CHECKING: