]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Switches task serialization over to pickle format
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Tue, 22 Nov 2022 17:59:59 +0000 (09:59 -0800)
committerTrenton H <797416+stumpylog@users.noreply.github.com>
Mon, 12 Dec 2022 23:37:01 +0000 (15:37 -0800)
src/documents/migrations/1028_remove_paperlesstask_task_args_and_more.py [new file with mode: 0644]
src/documents/models.py
src/documents/signals/handlers.py
src/documents/tests/test_api.py
src/documents/tests/test_task_signals.py
src/paperless/settings.py

diff --git a/src/documents/migrations/1028_remove_paperlesstask_task_args_and_more.py b/src/documents/migrations/1028_remove_paperlesstask_task_args_and_more.py
new file mode 100644 (file)
index 0000000..83f3ead
--- /dev/null
@@ -0,0 +1,21 @@
+# Generated by Django 4.1.3 on 2022-11-22 17:50
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+    dependencies = [
+        ("documents", "1027_remove_paperlesstask_attempted_task_and_more"),
+    ]
+
+    operations = [
+        migrations.RemoveField(
+            model_name="paperlesstask",
+            name="task_args",
+        ),
+        migrations.RemoveField(
+            model_name="paperlesstask",
+            name="task_kwargs",
+        ),
+    ]
index 1ee6dfedb833f3366a36e98a289f0c6e702ef678..a59340e5bcc54bd106ea429447dd2605a5d6d066 100644 (file)
@@ -560,20 +560,6 @@ class PaperlessTask(models.Model):
         help_text=_("Name of the Task which was run"),
     )
 
-    task_args = models.JSONField(
-        null=True,
-        verbose_name=_("Task Positional Arguments"),
-        help_text=_(
-            "JSON representation of the positional arguments used with the task",
-        ),
-    )
-    task_kwargs = models.JSONField(
-        null=True,
-        verbose_name=_("Task Named Arguments"),
-        help_text=_(
-            "JSON representation of the named arguments used with the task",
-        ),
-    )
     status = models.CharField(
         max_length=30,
         default=states.PENDING,
index c28ea8e2b5ee65b4086374a58775a75e47443b46..4c2554f02ec395912439634bd989aa23013b9377 100644 (file)
@@ -1,7 +1,6 @@
 import logging
 import os
 import shutil
-from ast import literal_eval
 from pathlib import Path
 
 from celery import states
@@ -521,25 +520,24 @@ def add_to_index(sender, document, **kwargs):
 def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
     """
     Creates the PaperlessTask object in a pending state.  This is sent before
-    the task reaches the broker, but
+    the task reaches the broker, but before it begins executing on a worker.
 
     https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish
 
+    https://docs.celeryq.dev/en/stable/internals/protocol.html#version-2
+
     """
     if "task" not in headers or headers["task"] != "documents.tasks.consume_file":
         # Assumption: this is only ever a v2 message
         return
 
     try:
-        task_file_name = ""
-        if headers["kwargsrepr"] is not None:
-            task_kwargs = literal_eval(headers["kwargsrepr"])
-            if "override_filename" in task_kwargs:
-                task_file_name = task_kwargs["override_filename"]
-        else:
-            task_kwargs = None
+        task_args = body[0]
+        task_kwargs = body[1]
 
-        task_args = literal_eval(headers["argsrepr"])
+        task_file_name = ""
+        if "override_filename" in task_kwargs:
+            task_file_name = task_kwargs["override_filename"]
 
         # Nothing was found, report the task first argument
         if not len(task_file_name):
@@ -552,8 +550,6 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
             status=states.PENDING,
             task_file_name=task_file_name,
             task_name=headers["task"],
-            task_args=task_args,
-            task_kwargs=task_kwargs,
             result=None,
             date_created=timezone.now(),
             date_started=None,
@@ -562,7 +558,7 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
     except Exception as e:  # pragma: no cover
         # Don't let an exception in the signal handlers prevent
         # a document from being consumed.
-        logger.error(f"Creating PaperlessTask failed: {e}")
+        logger.error(f"Creating PaperlessTask failed: {e}", exc_info=True)
 
 
 @task_prerun.connect
@@ -584,7 +580,7 @@ def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
     except Exception as e:  # pragma: no cover
         # Don't let an exception in the signal handlers prevent
         # a document from being consumed.
-        logger.error(f"Setting PaperlessTask started failed: {e}")
+        logger.error(f"Setting PaperlessTask started failed: {e}", exc_info=True)
 
 
 @task_postrun.connect
@@ -607,4 +603,4 @@ def task_postrun_handler(
     except Exception as e:  # pragma: no cover
         # Don't let an exception in the signal handlers prevent
         # a document from being consumed.
-        logger.error(f"Updating PaperlessTask failed: {e}")
+        logger.error(f"Updating PaperlessTask failed: {e}", exc_info=True)
index c9d8aefc25292dbc162c63624a3f8b2704fa79ed..0f890249cb24b999642c5f3c556531517b85be81 100644 (file)
@@ -3043,16 +3043,6 @@ class TestTasks(APITestCase):
             task_file_name="test.pdf",
             task_name="documents.tasks.some_task",
             status=celery.states.SUCCESS,
-            task_args=("/tmp/paperless/paperless-upload-5iq7skzc",),
-            task_kwargs={
-                "override_filename": "test.pdf",
-                "override_title": None,
-                "override_correspondent_id": None,
-                "override_document_type_id": None,
-                "override_tag_ids": None,
-                "task_id": "466e8fe7-7193-4698-9fff-72f0340e2082",
-                "override_created": None,
-            },
         )
 
         response = self.client.get(self.ENDPOINT)
@@ -3079,8 +3069,6 @@ class TestTasks(APITestCase):
             task_file_name="anothertest.pdf",
             task_name="documents.tasks.some_task",
             status=celery.states.SUCCESS,
-            task_args=("/consume/anothertest.pdf",),
-            task_kwargs={"override_tag_ids": None},
         )
 
         response = self.client.get(self.ENDPOINT)
index 8aafc1f122b101699d8946665c10017c865b65cb..e21879802a214990d06b7d07139dda22ab511311 100644 (file)
@@ -28,6 +28,14 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
         "ignore_result": False,
     }
 
+    BODY_CONSUME = (
+        # args
+        ("/consume/hello-999.pdf",),
+        # kwargs
+        {"override_tag_ids": None},
+        {"callbacks": None, "errbacks": None, "chain": None, "chord": None},
+    )
+
     HEADERS_WEB_UI = {
         "lang": "py",
         "task": "documents.tasks.consume_file",
@@ -47,64 +55,90 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
         "ignore_result": False,
     }
 
-    def util_call_before_task_publish_handler(self, headers_to_use):
+    BODY_WEB_UI = (
+        # args
+        ("/tmp/paperless/paperless-upload-st9lmbvx",),
+        # kwargs
+        {
+            "override_filename": "statement.pdf",
+            "override_title": None,
+            "override_correspondent_id": None,
+            "override_document_type_id": None,
+            "override_tag_ids": None,
+            "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
+            "override_created": None,
+        },
+        {"callbacks": None, "errbacks": None, "chain": None, "chord": None},
+    )
+
+    def util_call_before_task_publish_handler(self, headers_to_use, body_to_use):
+        """
+        Simple utility to call the pre-run handle and ensure it created a single task
+        instance
+        """
         self.assertEqual(PaperlessTask.objects.all().count(), 0)
 
-        before_task_publish_handler(headers=headers_to_use)
+        before_task_publish_handler(headers=headers_to_use, body=body_to_use)
 
         self.assertEqual(PaperlessTask.objects.all().count(), 1)
 
     def test_before_task_publish_handler_consume(self):
         """
         GIVEN:
-            - A celery task completed with an exception
+            - A celery task is started via the consume folder
         WHEN:
-            - API call is made to get tasks
+            - Task before publish handler is called
         THEN:
-            - The returned result is the exception info
+            - The task is created and marked as pending
         """
-        self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+        self.util_call_before_task_publish_handler(
+            headers_to_use=self.HEADERS_CONSUME,
+            body_to_use=self.BODY_CONSUME,
+        )
 
         task = PaperlessTask.objects.get()
         self.assertIsNotNone(task)
         self.assertEqual(self.HEADERS_CONSUME["id"], task.task_id)
-        self.assertListEqual(["/consume/hello-999.pdf"], task.task_args)
-        self.assertDictEqual({"override_tag_ids": None}, task.task_kwargs)
         self.assertEqual("hello-999.pdf", task.task_file_name)
         self.assertEqual("documents.tasks.consume_file", task.task_name)
         self.assertEqual(celery.states.PENDING, task.status)
 
     def test_before_task_publish_handler_webui(self):
-
-        self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_WEB_UI)
+        """
+        GIVEN:
+            - A celery task is started via the web ui
+        WHEN:
+            - Task before publish handler is called
+        THEN:
+            - The task is created and marked as pending
+        """
+        self.util_call_before_task_publish_handler(
+            headers_to_use=self.HEADERS_WEB_UI,
+            body_to_use=self.BODY_WEB_UI,
+        )
 
         task = PaperlessTask.objects.get()
 
         self.assertIsNotNone(task)
 
         self.assertEqual(self.HEADERS_WEB_UI["id"], task.task_id)
-        self.assertListEqual(
-            ["/tmp/paperless/paperless-upload-st9lmbvx"],
-            task.task_args,
-        )
-        self.assertDictEqual(
-            {
-                "override_filename": "statement.pdf",
-                "override_title": None,
-                "override_correspondent_id": None,
-                "override_document_type_id": None,
-                "override_tag_ids": None,
-                "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
-                "override_created": None,
-            },
-            task.task_kwargs,
-        )
         self.assertEqual("statement.pdf", task.task_file_name)
         self.assertEqual("documents.tasks.consume_file", task.task_name)
         self.assertEqual(celery.states.PENDING, task.status)
 
     def test_task_prerun_handler(self):
-        self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+        """
+        GIVEN:
+            - A celery task is started via the consume folder
+        WHEN:
+            - Task starts execution
+        THEN:
+            - The task is marked as started
+        """
+        self.util_call_before_task_publish_handler(
+            headers_to_use=self.HEADERS_CONSUME,
+            body_to_use=self.BODY_CONSUME,
+        )
 
         task_prerun_handler(task_id=self.HEADERS_CONSUME["id"])
 
@@ -113,7 +147,18 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
         self.assertEqual(celery.states.STARTED, task.status)
 
     def test_task_postrun_handler(self):
-        self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
+        """
+        GIVEN:
+            - A celery task is started via the consume folder
+        WHEN:
+            - Task finished execution
+        THEN:
+            - The task is marked as started
+        """
+        self.util_call_before_task_publish_handler(
+            headers_to_use=self.HEADERS_CONSUME,
+            body_to_use=self.BODY_CONSUME,
+        )
 
         task_postrun_handler(
             task_id=self.HEADERS_CONSUME["id"],
index 40c7a5c3b4e77360b65bb60b4ed80d2f687aa5cb..0bdec474596f74c82c1b8cebdc5de051315071d5 100644 (file)
@@ -526,6 +526,10 @@ CELERY_RESULT_EXTENDED = True
 CELERY_RESULT_BACKEND = "django-db"
 CELERY_CACHE_BACKEND = "default"
 
+# This allows types to stay types through a .delay
+CELERY_TASK_SERIALIZER = "pickle"
+CELERY_ACCEPT_CONTENT = ["application/x-python-serialize"]
+
 CELERY_BEAT_SCHEDULE = {
     # Every ten minutes
     "Check all e-mail accounts": {