]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Incremental llm index update, add scheduled llm index task
authorshamoon <4887959+shamoon@users.noreply.github.com>
Mon, 28 Apr 2025 17:29:07 +0000 (10:29 -0700)
committershamoon <4887959+shamoon@users.noreply.github.com>
Wed, 2 Jul 2025 18:03:59 +0000 (11:03 -0700)
docs/configuration.md
src/documents/management/commands/document_llmindex.py
src/documents/tasks.py
src/paperless/ai/indexing.py
src/paperless/config.py
src/paperless/settings.py
src/paperless/tests/test_ai_indexing.py
src/paperless/tests/test_settings.py

index 07a2be19dffc9d375abbb1e4eff904a15b4ba151..85e2f252651b928abde3d33a77ef689affee55f2 100644 (file)
@@ -1816,3 +1816,10 @@ current backend. This setting is required to be set to use the AI features.
 : The URL to use for the AI backend. This is required for the Ollama backend only.
 
     Defaults to None.
+
+#### [`PAPERLESS_LLM_INDEX_TASK_CRON=<cron expression>`](#PAPERLESS_LLM_INDEX_TASK_CRON) {#PAPERLESS_LLM_INDEX_TASK_CRON}
+
+: Configures the schedule to update the AI embeddings for all documents. Only performed if
+AI is enabled and the LLM embedding backend is set.
+
+    Defaults to `10 2 * * *`, once per day.
index 09ea477c2210c6bfdf6cc09b19af6137dae48cbc..74c5c4d69e885cce0acf858b8a68f7927facdf5f 100644 (file)
@@ -2,20 +2,20 @@ from django.core.management import BaseCommand
 from django.db import transaction
 
 from documents.management.commands.mixins import ProgressBarMixin
-from documents.tasks import llm_index_rebuild
+from documents.tasks import llmindex_index
 
 
 class Command(ProgressBarMixin, BaseCommand):
     help = "Manages the LLM-based vector index for Paperless."
 
     def add_arguments(self, parser):
-        parser.add_argument("command", choices=["rebuild"])
+        parser.add_argument("command", choices=["rebuild", "update"])
         self.add_argument_progress_bar_mixin(parser)
 
     def handle(self, *args, **options):
         self.handle_progress_bar_mixin(**options)
         with transaction.atomic():
-            llm_index_rebuild(
+            llmindex_index(
                 progress_bar_disable=self.no_progress_bar,
                 rebuild=options["command"] == "rebuild",
             )
index 3edde40cb65e841fda94df9132bd3507061b20b4..6861804dcf22fdf2be0b62cf60b5e6ae1f34f85e 100644 (file)
@@ -56,7 +56,7 @@ from documents.signals.handlers import cleanup_document_deletion
 from documents.signals.handlers import run_workflows
 from paperless.ai.indexing import llm_index_add_or_update_document
 from paperless.ai.indexing import llm_index_remove_document
-from paperless.ai.indexing import rebuild_llm_index
+from paperless.ai.indexing import update_llm_index
 from paperless.config import AIConfig
 
 if settings.AUDIT_LOG_ENABLED:
@@ -532,11 +532,14 @@ def check_scheduled_workflows():
                         )
 
 
-def llm_index_rebuild(*, progress_bar_disable=False, rebuild=False):
-    rebuild_llm_index(
-        progress_bar_disable=progress_bar_disable,
-        rebuild=rebuild,
-    )
+@shared_task
+def llmindex_index(*, progress_bar_disable=False, rebuild=False):
+    ai_config = AIConfig()
+    if ai_config.llm_index_enabled():
+        update_llm_index(
+            progress_bar_disable=progress_bar_disable,
+            rebuild=rebuild,
+        )
 
 
 @shared_task
@@ -552,6 +555,6 @@ def remove_document_from_llm_index(document):
 # TODO: schedule to run periodically
 @shared_task
 def rebuild_llm_index_task():
-    from paperless.ai.indexing import rebuild_llm_index
+    from paperless.ai.indexing import update_llm_index
 
-    rebuild_llm_index(rebuild=True)
+    update_llm_index(rebuild=True)
index 2ec4f4925c600ba04c2e6cc676145b124165d7ba..11b8179ee71038dbe1c237e45687b35a3546c5a5 100644 (file)
@@ -8,6 +8,7 @@ from django.conf import settings
 from llama_index.core import Document as LlamaDocument
 from llama_index.core import StorageContext
 from llama_index.core import VectorStoreIndex
+from llama_index.core import load_index_from_storage
 from llama_index.core.node_parser import SimpleNodeParser
 from llama_index.core.retrievers import VectorIndexRetriever
 from llama_index.core.schema import BaseNode
@@ -70,7 +71,7 @@ def build_document_node(document: Document) -> list[BaseNode]:
 
     text = build_llm_index_text(document)
     metadata = {
-        "document_id": document.id,
+        "document_id": str(document.id),
         "title": document.title,
         "tags": [t.name for t in document.tags.all()],
         "correspondent": document.correspondent.name
@@ -81,32 +82,29 @@ def build_document_node(document: Document) -> list[BaseNode]:
         else None,
         "created": document.created.isoformat() if document.created else None,
         "added": document.added.isoformat() if document.added else None,
+        "modified": document.modified.isoformat(),
     }
     doc = LlamaDocument(text=text, metadata=metadata)
     parser = SimpleNodeParser()
     return parser.get_nodes_from_documents([doc])
 
 
-def load_or_build_index(storage_context, embed_model, nodes=None):
+def load_or_build_index(storage_context: StorageContext, embed_model, nodes=None):
     """
     Load an existing VectorStoreIndex if present,
     or build a new one using provided nodes if storage is empty.
     """
     try:
+        return load_index_from_storage(storage_context=storage_context)
+    except ValueError as e:
+        logger.debug("Failed to load index from storage: %s", e)
+        if not nodes:
+            return None
         return VectorStoreIndex(
+            nodes=nodes,
             storage_context=storage_context,
             embed_model=embed_model,
         )
-    except ValueError as e:
-        if "One of nodes, objects, or index_struct must be provided" in str(e):
-            if not nodes:
-                return None
-            return VectorStoreIndex(
-                nodes=nodes,
-                storage_context=storage_context,
-                embed_model=embed_model,
-            )
-        raise
 
 
 def remove_document_docstore_nodes(document: Document, index: VectorStoreIndex):
@@ -125,31 +123,74 @@ def remove_document_docstore_nodes(document: Document, index: VectorStoreIndex):
         index.docstore.delete_document(node_id)
 
 
-def rebuild_llm_index(*, progress_bar_disable=False, rebuild=False):
+def update_llm_index(*, progress_bar_disable=False, rebuild=False):
     """
-    Rebuilds the LLM index from scratch.
+    Rebuild or update the LLM index.
     """
     embed_model = get_embedding_model()
     llama_settings.Settings.embed_model = embed_model
-
     storage_context = get_or_create_storage_context(rebuild=rebuild)
 
     nodes = []
 
-    for document in tqdm.tqdm(Document.objects.all(), disable=progress_bar_disable):
-        document_nodes = build_document_node(document)
-        nodes.extend(document_nodes)
+    documents = Document.objects.all()
+    if not documents.exists():
+        logger.warning("No documents found to index.")
+        return
+
+    if rebuild:
+        # Rebuild index from scratch
+        for document in tqdm.tqdm(documents, disable=progress_bar_disable):
+            document_nodes = build_document_node(document)
+            nodes.extend(document_nodes)
 
-    if not nodes:
-        raise RuntimeError(
-            "No nodes to index — check that documents are available and have content.",
+        VectorStoreIndex(
+            nodes=nodes,
+            storage_context=storage_context,
+            embed_model=embed_model,
+            show_progress=not progress_bar_disable,
         )
+    else:
+        # Update existing index
+        index = load_or_build_index(storage_context, embed_model)
+        all_node_ids = list(index.docstore.docs.keys())
+        existing_nodes = {
+            node.metadata.get("document_id"): node
+            for node in index.docstore.get_nodes(all_node_ids)
+        }
+
+        node_ids_to_remove = []
+
+        for document in tqdm.tqdm(documents, disable=progress_bar_disable):
+            doc_id = str(document.id)
+            document_modified = document.modified.isoformat()
+
+            if doc_id in existing_nodes:
+                node = existing_nodes[doc_id]
+                node_modified = node.metadata.get("modified")
+
+                if node_modified == document_modified:
+                    continue
+
+                node_ids_to_remove.append(node.node_id)
+                nodes.extend(build_document_node(document))
+            else:
+                # New document, add it
+                nodes.extend(build_document_node(document))
+
+        if node_ids_to_remove or nodes:
+            logger.info(
+                "Updating LLM index with %d new nodes and removing %d old nodes.",
+                len(nodes),
+                len(node_ids_to_remove),
+            )
+            if node_ids_to_remove:
+                index.delete_nodes(node_ids_to_remove)
+            if nodes:
+                index.insert_nodes(nodes)
+        else:
+            logger.info("No changes detected, skipping llm index rebuild.")
 
-    VectorStoreIndex(
-        nodes=nodes,
-        storage_context=storage_context,
-        embed_model=embed_model,
-    )
     storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
 
 
@@ -187,6 +228,7 @@ def llm_index_remove_document(document: Document):
     storage_context = get_or_create_storage_context(rebuild=False)
 
     index = load_or_build_index(storage_context, embed_model)
+
     if index is None:
         return
 
index ca61e00c768fb030f33404fd346dd2f23110d125..c263ed6feebb3ed8f67dbeeaa53e6da175ca6db8 100644 (file)
@@ -201,6 +201,4 @@ class AIConfig(BaseConfig):
         self.llm_url = app_config.llm_url or settings.LLM_URL
 
     def llm_index_enabled(self) -> bool:
-        return (
-            self.ai_enabled and self.llm_embedding_backend and self.llm_embedding_model
-        )
+        return self.ai_enabled and self.llm_embedding_backend
index 4847fd64e088b061a2812d2ffab28c46042613fc..f060fa89e6067563c333c496e0a34ebbe151ebde 100644 (file)
@@ -234,6 +234,20 @@ def _parse_beat_schedule() -> dict:
                 "expires": 59.0 * 60.0,
             },
         },
+        {
+            "name": "Rebuild LLM index",
+            "env_key": "PAPERLESS_LLM_INDEX_TASK_CRON",
+            # Default daily at 02:10
+            "env_default": "10 2 * * *",
+            "task": "documents.tasks.llmindex_index",
+            "options": {
+                # 1 hour before default schedule sends again
+                "expires": 23.0 * 60.0 * 60.0,
+                "kwargs": {
+                    "progress_bar_disable": True,
+                },
+            },
+        },
     ]
     for task in tasks:
         # Either get the environment setting or use the default
index 24cdedaad7b399dd5cb3ba32b75c3d66ce9794f4..970f8293da41407034eee3258fd4c12964f31a0b 100644 (file)
@@ -53,7 +53,7 @@ class FakeEmbedding(BaseEmbedding):
 def test_build_document_node(real_document):
     nodes = indexing.build_document_node(real_document)
     assert len(nodes) > 0
-    assert nodes[0].metadata["document_id"] == real_document.id
+    assert nodes[0].metadata["document_id"] == str(real_document.id)
 
 
 @pytest.mark.django_db
@@ -63,8 +63,11 @@ def test_rebuild_llm_index(
     mock_embed_model,
 ):
     with patch("documents.models.Document.objects.all") as mock_all:
-        mock_all.return_value = [real_document]
-        indexing.rebuild_llm_index(rebuild=True)
+        mock_queryset = MagicMock()
+        mock_queryset.exists.return_value = True
+        mock_queryset.__iter__.return_value = iter([real_document])
+        mock_all.return_value = mock_queryset
+        indexing.update_llm_index(rebuild=True)
 
         assert any(temp_llm_index_dir.glob("*.json"))
 
@@ -75,7 +78,7 @@ def test_add_or_update_document_updates_existing_entry(
     real_document,
     mock_embed_model,
 ):
-    indexing.rebuild_llm_index(rebuild=True)
+    indexing.update_llm_index(rebuild=True)
     indexing.llm_index_add_or_update_document(real_document)
 
     assert any(temp_llm_index_dir.glob("*.json"))
@@ -87,7 +90,7 @@ def test_remove_document_deletes_node_from_docstore(
     real_document,
     mock_embed_model,
 ):
-    indexing.rebuild_llm_index(rebuild=True)
+    indexing.update_llm_index(rebuild=True)
     indexing.llm_index_add_or_update_document(real_document)
     indexing.llm_index_remove_document(real_document)
 
@@ -100,10 +103,17 @@ def test_rebuild_llm_index_no_documents(
     mock_embed_model,
 ):
     with patch("documents.models.Document.objects.all") as mock_all:
-        mock_all.return_value = []
-
-        with pytest.raises(RuntimeError, match="No nodes to index"):
-            indexing.rebuild_llm_index(rebuild=True)
+        mock_queryset = MagicMock()
+        mock_queryset.exists.return_value = False
+        mock_queryset.__iter__.return_value = iter([])
+        mock_all.return_value = mock_queryset
+
+        # check log message
+        with patch("paperless.ai.indexing.logger") as mock_logger:
+            indexing.update_llm_index(rebuild=True)
+            mock_logger.warning.assert_called_once_with(
+                "No documents found to index.",
+            )
 
 
 def test_query_similar_documents(
index 8a191f209bad203fec7d3604e989137ce4d9b7d6..c0b75958269444efba45d7b4dc07b25868d51708 100644 (file)
@@ -161,6 +161,7 @@ class TestCeleryScheduleParsing(TestCase):
     SANITY_EXPIRE_TIME = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
     EMPTY_TRASH_EXPIRE_TIME = 23.0 * 60.0 * 60.0
     RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME = 59.0 * 60.0
+    LLM_INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
 
     def test_schedule_configuration_default(self):
         """
@@ -205,6 +206,16 @@ class TestCeleryScheduleParsing(TestCase):
                     "schedule": crontab(minute="5", hour="*/1"),
                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
                 },
+                "Rebuild LLM index": {
+                    "task": "documents.tasks.llmindex_index",
+                    "schedule": crontab(minute=10, hour=2),
+                    "options": {
+                        "expires": self.LLM_INDEX_EXPIRE_TIME,
+                        "kwargs": {
+                            "progress_bar_disable": True,
+                        },
+                    },
+                },
             },
             schedule,
         )
@@ -257,6 +268,16 @@ class TestCeleryScheduleParsing(TestCase):
                     "schedule": crontab(minute="5", hour="*/1"),
                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
                 },
+                "Rebuild LLM index": {
+                    "task": "documents.tasks.llmindex_index",
+                    "schedule": crontab(minute=10, hour=2),
+                    "options": {
+                        "expires": self.LLM_INDEX_EXPIRE_TIME,
+                        "kwargs": {
+                            "progress_bar_disable": True,
+                        },
+                    },
+                },
             },
             schedule,
         )
@@ -301,6 +322,16 @@ class TestCeleryScheduleParsing(TestCase):
                     "schedule": crontab(minute="5", hour="*/1"),
                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
                 },
+                "Rebuild LLM index": {
+                    "task": "documents.tasks.llmindex_index",
+                    "schedule": crontab(minute=10, hour=2),
+                    "options": {
+                        "expires": self.LLM_INDEX_EXPIRE_TIME,
+                        "kwargs": {
+                            "progress_bar_disable": True,
+                        },
+                    },
+                },
             },
             schedule,
         )
@@ -323,6 +354,7 @@ class TestCeleryScheduleParsing(TestCase):
                 "PAPERLESS_INDEX_TASK_CRON": "disable",
                 "PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
                 "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
+                "PAPERLESS_LLM_INDEX_TASK_CRON": "disable",
             },
         ):
             schedule = _parse_beat_schedule()