import logging
import os
import shutil
-from ast import literal_eval
from pathlib import Path
from celery import states
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
"""
Creates the PaperlessTask object in a pending state. This is sent before
- the task reaches the broker, but
+ the task reaches the broker, but before it begins executing on a worker.
https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish
+ https://docs.celeryq.dev/en/stable/internals/protocol.html#version-2
+
"""
if "task" not in headers or headers["task"] != "documents.tasks.consume_file":
# Assumption: this is only ever a v2 message
return
try:
- task_file_name = ""
- if headers["kwargsrepr"] is not None:
- task_kwargs = literal_eval(headers["kwargsrepr"])
- if "override_filename" in task_kwargs:
- task_file_name = task_kwargs["override_filename"]
- else:
- task_kwargs = None
+ task_args = body[0]
+ task_kwargs = body[1]
- task_args = literal_eval(headers["argsrepr"])
+ task_file_name = ""
+ if "override_filename" in task_kwargs:
+ task_file_name = task_kwargs["override_filename"]
# Nothing was found, report the task first argument
if not len(task_file_name):
status=states.PENDING,
task_file_name=task_file_name,
task_name=headers["task"],
- task_args=task_args,
- task_kwargs=task_kwargs,
result=None,
date_created=timezone.now(),
date_started=None,
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Creating PaperlessTask failed: {e}")
+ logger.error(f"Creating PaperlessTask failed: {e}", exc_info=True)
@task_prerun.connect
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Setting PaperlessTask started failed: {e}")
+ logger.error(f"Setting PaperlessTask started failed: {e}", exc_info=True)
@task_postrun.connect
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
- logger.error(f"Updating PaperlessTask failed: {e}")
+ logger.error(f"Updating PaperlessTask failed: {e}", exc_info=True)
task_file_name="test.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
- task_args=("/tmp/paperless/paperless-upload-5iq7skzc",),
- task_kwargs={
- "override_filename": "test.pdf",
- "override_title": None,
- "override_correspondent_id": None,
- "override_document_type_id": None,
- "override_tag_ids": None,
- "task_id": "466e8fe7-7193-4698-9fff-72f0340e2082",
- "override_created": None,
- },
)
response = self.client.get(self.ENDPOINT)
task_file_name="anothertest.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
- task_args=("/consume/anothertest.pdf",),
- task_kwargs={"override_tag_ids": None},
)
response = self.client.get(self.ENDPOINT)
"ignore_result": False,
}
+ BODY_CONSUME = (
+ # args
+ ("/consume/hello-999.pdf",),
+ # kwargs
+ {"override_tag_ids": None},
+ {"callbacks": None, "errbacks": None, "chain": None, "chord": None},
+ )
+
HEADERS_WEB_UI = {
"lang": "py",
"task": "documents.tasks.consume_file",
"ignore_result": False,
}
- def util_call_before_task_publish_handler(self, headers_to_use):
+ BODY_WEB_UI = (
+ # args
+ ("/tmp/paperless/paperless-upload-st9lmbvx",),
+ # kwargs
+ {
+ "override_filename": "statement.pdf",
+ "override_title": None,
+ "override_correspondent_id": None,
+ "override_document_type_id": None,
+ "override_tag_ids": None,
+ "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
+ "override_created": None,
+ },
+ {"callbacks": None, "errbacks": None, "chain": None, "chord": None},
+ )
+
+ def util_call_before_task_publish_handler(self, headers_to_use, body_to_use):
+ """
+ Simple utility to call the pre-run handle and ensure it created a single task
+ instance
+ """
self.assertEqual(PaperlessTask.objects.all().count(), 0)
- before_task_publish_handler(headers=headers_to_use)
+ before_task_publish_handler(headers=headers_to_use, body=body_to_use)
self.assertEqual(PaperlessTask.objects.all().count(), 1)
def test_before_task_publish_handler_consume(self):
"""
GIVEN:
- - A celery task completed with an exception
+ - A celery task is started via the consume folder
WHEN:
- - API call is made to get tasks
+ - Task before publish handler is called
THEN:
- - The returned result is the exception info
+ - The task is created and marked as pending
"""
- self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+ self.util_call_before_task_publish_handler(
+ headers_to_use=self.HEADERS_CONSUME,
+ body_to_use=self.BODY_CONSUME,
+ )
task = PaperlessTask.objects.get()
self.assertIsNotNone(task)
self.assertEqual(self.HEADERS_CONSUME["id"], task.task_id)
- self.assertListEqual(["/consume/hello-999.pdf"], task.task_args)
- self.assertDictEqual({"override_tag_ids": None}, task.task_kwargs)
self.assertEqual("hello-999.pdf", task.task_file_name)
self.assertEqual("documents.tasks.consume_file", task.task_name)
self.assertEqual(celery.states.PENDING, task.status)
def test_before_task_publish_handler_webui(self):
-
- self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_WEB_UI)
+ """
+ GIVEN:
+ - A celery task is started via the web ui
+ WHEN:
+ - Task before publish handler is called
+ THEN:
+ - The task is created and marked as pending
+ """
+ self.util_call_before_task_publish_handler(
+ headers_to_use=self.HEADERS_WEB_UI,
+ body_to_use=self.BODY_WEB_UI,
+ )
task = PaperlessTask.objects.get()
self.assertIsNotNone(task)
self.assertEqual(self.HEADERS_WEB_UI["id"], task.task_id)
- self.assertListEqual(
- ["/tmp/paperless/paperless-upload-st9lmbvx"],
- task.task_args,
- )
- self.assertDictEqual(
- {
- "override_filename": "statement.pdf",
- "override_title": None,
- "override_correspondent_id": None,
- "override_document_type_id": None,
- "override_tag_ids": None,
- "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
- "override_created": None,
- },
- task.task_kwargs,
- )
self.assertEqual("statement.pdf", task.task_file_name)
self.assertEqual("documents.tasks.consume_file", task.task_name)
self.assertEqual(celery.states.PENDING, task.status)
def test_task_prerun_handler(self):
- self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+ """
+ GIVEN:
+ - A celery task is started via the consume folder
+ WHEN:
+ - Task starts execution
+ THEN:
+ - The task is marked as started
+ """
+ self.util_call_before_task_publish_handler(
+ headers_to_use=self.HEADERS_CONSUME,
+ body_to_use=self.BODY_CONSUME,
+ )
task_prerun_handler(task_id=self.HEADERS_CONSUME["id"])
self.assertEqual(celery.states.STARTED, task.status)
def test_task_postrun_handler(self):
- self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+ """
+ GIVEN:
+ - A celery task is started via the consume folder
+ WHEN:
+ - Task finished execution
+ THEN:
+ - The task is marked as started
+ """
+ self.util_call_before_task_publish_handler(
+ headers_to_use=self.HEADERS_CONSUME,
+ body_to_use=self.BODY_CONSUME,
+ )
task_postrun_handler(
task_id=self.HEADERS_CONSUME["id"],