]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Expire the scheduled tasks shortly a new one will be added to the queue by default
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Fri, 3 Feb 2023 22:41:10 +0000 (14:41 -0800)
committerTrenton H <797416+stumpylog@users.noreply.github.com>
Wed, 8 Feb 2023 16:18:11 +0000 (08:18 -0800)
src/paperless/settings.py
src/paperless/tests/test_settings.py

index cf119ea8a0ad6520cfe449c6e8da951463d4d1e4..fcd933cfd43a66872878c0a23d11d5d7720a4d5a 100644 (file)
@@ -109,6 +109,16 @@ def _parse_redis_url(env_redis: Optional[str]) -> Tuple[str]:
 
 
 def _parse_beat_schedule() -> Dict:
+    """
+    Configures the scheduled tasks, according to default or
+    environment variables.  Task expiration is configured so the task will
+    expire (and not run), shortly before the default frequency will put another
+    of the same task into the queue
+
+
+    https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
+    https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
+    """
     schedule = {}
     tasks = [
         {
@@ -117,6 +127,11 @@ def _parse_beat_schedule() -> Dict:
             # Default every ten minutes
             "env_default": "*/10 * * * *",
             "task": "paperless_mail.tasks.process_mail_accounts",
+            "options": {
+                # 1 minute before default schedule sends again
+                "expires": 9.0
+                * 60.0,
+            },
         },
         {
             "name": "Train the classifier",
@@ -124,6 +139,11 @@ def _parse_beat_schedule() -> Dict:
             # Default hourly at 5 minutes past the hour
             "env_default": "5 */1 * * *",
             "task": "documents.tasks.train_classifier",
+            "options": {
+                # 1 minute before default schedule sends again
+                "expires": 59.0
+                * 60.0,
+            },
         },
         {
             "name": "Optimize the index",
@@ -131,6 +151,12 @@ def _parse_beat_schedule() -> Dict:
             # Default daily at midnight
             "env_default": "0 0 * * *",
             "task": "documents.tasks.index_optimize",
+            "options": {
+                # 1 hour before default schedule sends again
+                "expires": 23.0
+                * 60.0
+                * 60.0,
+            },
         },
         {
             "name": "Perform sanity check",
@@ -138,6 +164,13 @@ def _parse_beat_schedule() -> Dict:
             # Default Sunday at 00:30
             "env_default": "30 0 * * sun",
             "task": "documents.tasks.sanity_check",
+            "options": {
+                # 1 hour before default schedule sends again
+                "expires": 7.0
+                * 23.0
+                * 60.0
+                * 60.0,
+            },
         },
     ]
     for task in tasks:
@@ -151,9 +184,11 @@ def _parse_beat_schedule() -> Dict:
         #   - five time-and-date fields
         #   - separated by at least one blank
         minute, hour, day_month, month, day_week = value.split(" ")
+
         schedule[task["name"]] = {
             "task": task["task"],
             "schedule": crontab(minute, hour, day_week, day_month, month),
+            "options": task["options"],
         }
 
     return schedule
@@ -561,22 +596,21 @@ LOGGING = {
 # Task queue                                                                  #
 ###############################################################################
 
-TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1)
-
-WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
+# https://docs.celeryq.dev/en/stable/userguide/configuration.html
 
 CELERY_BROKER_URL = _CELERY_REDIS_URL
 CELERY_TIMEZONE = TIME_ZONE
 
 CELERY_WORKER_HIJACK_ROOT_LOGGER = False
-CELERY_WORKER_CONCURRENCY = TASK_WORKERS
+CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
+TASK_WORKERS = CELERY_WORKER_CONCURRENCY
 CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
 CELERY_WORKER_SEND_TASK_EVENTS = True
-
+CELERY_TASK_SEND_SENT_EVENT = True
 CELERY_SEND_TASK_SENT_EVENT = True
 
 CELERY_TASK_TRACK_STARTED = True
-CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT
+CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
 
 CELERY_RESULT_EXTENDED = True
 CELERY_RESULT_BACKEND = "django-db"
@@ -608,7 +642,7 @@ def default_threads_per_worker(task_workers) -> int:
 
 THREADS_PER_WORKER = os.getenv(
     "PAPERLESS_THREADS_PER_WORKER",
-    default_threads_per_worker(TASK_WORKERS),
+    default_threads_per_worker(CELERY_WORKER_CONCURRENCY),
 )
 
 ###############################################################################
index f6d25f6fd79e67f5f59a662b088442a5d7b408de..a85f0e06ae8dbf4eba04da3290863684c8b7cabd 100644 (file)
@@ -149,6 +149,11 @@ class TestRedisSocketConversion(TestCase):
 
 
 class TestCeleryScheduleParsing(TestCase):
+    MAIL_EXPIRE_TIME = 9.0 * 60.0
+    CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0
+    INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
+    SANITY_EXPIRE_TIME = 7.0 * 23.0 * 60.0 * 60.0
+
     def test_schedule_configuration_default(self):
         """
         GIVEN:
@@ -165,18 +170,22 @@ class TestCeleryScheduleParsing(TestCase):
                 "Check all e-mail accounts": {
                     "task": "paperless_mail.tasks.process_mail_accounts",
                     "schedule": crontab(minute="*/10"),
+                    "options": {"expires": self.MAIL_EXPIRE_TIME},
                 },
                 "Train the classifier": {
                     "task": "documents.tasks.train_classifier",
                     "schedule": crontab(minute="5", hour="*/1"),
+                    "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
                 },
                 "Optimize the index": {
                     "task": "documents.tasks.index_optimize",
                     "schedule": crontab(minute=0, hour=0),
+                    "options": {"expires": self.INDEX_EXPIRE_TIME},
                 },
                 "Perform sanity check": {
                     "task": "documents.tasks.sanity_check",
                     "schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+                    "options": {"expires": self.SANITY_EXPIRE_TIME},
                 },
             },
             schedule,
@@ -203,18 +212,22 @@ class TestCeleryScheduleParsing(TestCase):
                 "Check all e-mail accounts": {
                     "task": "paperless_mail.tasks.process_mail_accounts",
                     "schedule": crontab(minute="*/50", day_of_week="mon"),
+                    "options": {"expires": self.MAIL_EXPIRE_TIME},
                 },
                 "Train the classifier": {
                     "task": "documents.tasks.train_classifier",
                     "schedule": crontab(minute="5", hour="*/1"),
+                    "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
                 },
                 "Optimize the index": {
                     "task": "documents.tasks.index_optimize",
                     "schedule": crontab(minute=0, hour=0),
+                    "options": {"expires": self.INDEX_EXPIRE_TIME},
                 },
                 "Perform sanity check": {
                     "task": "documents.tasks.sanity_check",
                     "schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+                    "options": {"expires": self.SANITY_EXPIRE_TIME},
                 },
             },
             schedule,
@@ -238,14 +251,17 @@ class TestCeleryScheduleParsing(TestCase):
                 "Check all e-mail accounts": {
                     "task": "paperless_mail.tasks.process_mail_accounts",
                     "schedule": crontab(minute="*/10"),
+                    "options": {"expires": self.MAIL_EXPIRE_TIME},
                 },
                 "Train the classifier": {
                     "task": "documents.tasks.train_classifier",
                     "schedule": crontab(minute="5", hour="*/1"),
+                    "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
                 },
                 "Perform sanity check": {
                     "task": "documents.tasks.sanity_check",
                     "schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+                    "options": {"expires": self.SANITY_EXPIRE_TIME},
                 },
             },
             schedule,