def _parse_beat_schedule() -> Dict:
+ """
+ Configures the scheduled tasks, according to default or
+ environment variables. Task expiration is configured so the task will
+ expire (and not run), shortly before the default frequency will put another
+ of the same task into the queue
+
+
+ https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
+ https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
+ """
schedule = {}
tasks = [
{
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
+ "options": {
+ # 1 minute before default schedule sends again
+ "expires": 9.0
+ * 60.0,
+ },
},
{
"name": "Train the classifier",
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
+ "options": {
+ # 1 minute before default schedule sends again
+ "expires": 59.0
+ * 60.0,
+ },
},
{
"name": "Optimize the index",
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
+ "options": {
+ # 1 hour before default schedule sends again
+ "expires": 23.0
+ * 60.0
+ * 60.0,
+ },
},
{
"name": "Perform sanity check",
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
+ "options": {
+ # 1 hour before default schedule sends again
+ "expires": 7.0
+ * 23.0
+ * 60.0
+ * 60.0,
+ },
},
]
for task in tasks:
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
+
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
+ "options": task["options"],
}
return schedule
# Task queue #
###############################################################################
-TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1)
-
-WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
+# https://docs.celeryq.dev/en/stable/userguide/configuration.html
CELERY_BROKER_URL = _CELERY_REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
-CELERY_WORKER_CONCURRENCY = TASK_WORKERS
+CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
+TASK_WORKERS = CELERY_WORKER_CONCURRENCY
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_WORKER_SEND_TASK_EVENTS = True
-
+CELERY_TASK_SEND_SENT_EVENT = True
CELERY_SEND_TASK_SENT_EVENT = True
CELERY_TASK_TRACK_STARTED = True
-CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT
+CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
THREADS_PER_WORKER = os.getenv(
"PAPERLESS_THREADS_PER_WORKER",
- default_threads_per_worker(TASK_WORKERS),
+ default_threads_per_worker(CELERY_WORKER_CONCURRENCY),
)
###############################################################################
class TestCeleryScheduleParsing(TestCase):
+ MAIL_EXPIRE_TIME = 9.0 * 60.0
+ CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0
+ INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
+ SANITY_EXPIRE_TIME = 7.0 * 23.0 * 60.0 * 60.0
+
def test_schedule_configuration_default(self):
"""
GIVEN:
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
+ "options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
+ "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
+ "options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+ "options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/50", day_of_week="mon"),
+ "options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
+ "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
+ "options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+ "options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
+ "options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
+ "options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
+ "options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,