]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Fix: Handle tanvity index lock contention (#12856)
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Wed, 27 May 2026 16:47:13 +0000 (09:47 -0700)
committerGitHub <noreply@github.com>
Wed, 27 May 2026 16:47:13 +0000 (09:47 -0700)
Implements and tests a retry with backoff + jitter for aquring the index update lock.  If we still can't get it, dispatch a celery task to handle it later instead (also with retry)

Signed-off-by: stumpylog <797416+stumpylog@users.noreply.github.com>
src/documents/search/_backend.py
src/documents/tasks.py
src/documents/tests/search/test_lock_backoff.py [new file with mode: 0644]

index cd2ab031cfb3a34f9468c9cf967d7fa1eb8e863d..68a9aecadb0c204dbcd420d0ce94f5dfe1502316 100644 (file)
@@ -1,12 +1,15 @@
 from __future__ import annotations
 
 import logging
+import random
 import re
 import threading
+import time
 from datetime import UTC
 from datetime import datetime
 from enum import StrEnum
 from typing import TYPE_CHECKING
+from typing import Final
 from typing import Self
 from typing import TypedDict
 from typing import TypeVar
@@ -43,6 +46,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger("paperless.search")
 
+_LOCK_TIMEOUT_SECONDS: Final[float] = 10.0  # per-attempt acquire timeout
+_LOCK_RETRY_ATTEMPTS: Final[int] = 4  # total attempts (1 initial + 3 retries)
+_LOCK_BACKOFF_BASE: Final[float] = 1.0  # seconds
+_LOCK_BACKOFF_CAP: Final[float] = 10.0  # seconds
+
 _WORD_RE = regex.compile(r"\w+")
 _AUTOCOMPLETE_REGEX_TIMEOUT = 1.0  # seconds; guards against ReDoS on untrusted content
 
@@ -183,12 +191,27 @@ class WriteBatch:
         if self._backend._path is not None:
             lock_path = self._backend._path / ".tantivy.lock"
             self._lock = filelock.FileLock(str(lock_path))
-            try:
-                self._lock.acquire(timeout=self._lock_timeout)
-            except filelock.Timeout as e:  # pragma: no cover
-                raise SearchIndexLockError(
-                    f"Could not acquire index lock within {self._lock_timeout}s",
-                ) from e
+            for attempt in range(_LOCK_RETRY_ATTEMPTS):
+                try:
+                    self._lock.acquire(timeout=self._lock_timeout)
+                    break
+                except filelock.Timeout:
+                    if attempt == _LOCK_RETRY_ATTEMPTS - 1:
+                        raise SearchIndexLockError(
+                            f"Could not acquire index lock after {_LOCK_RETRY_ATTEMPTS} "
+                            f"attempts (timeout={self._lock_timeout}s each)",
+                        )
+                    sleep_s = random.uniform(
+                        0,
+                        min(_LOCK_BACKOFF_CAP, _LOCK_BACKOFF_BASE * (2**attempt)),
+                    )
+                    logger.debug(
+                        "Index lock contention; retrying in %.2fs (attempt %d/%d)",
+                        sleep_s,
+                        attempt + 1,
+                        _LOCK_RETRY_ATTEMPTS,
+                    )
+                    time.sleep(sleep_s)
 
         self._raw_writer = self._backend._index.writer()
         return self
@@ -490,13 +513,28 @@ class TantivyBackend:
         Convenience method for single-document updates. For bulk operations,
         use batch_update() context manager for better performance.
 
+        On lock exhaustion after all retry attempts, schedules a deferred
+        index_document Celery task and returns normally. Callers will NOT
+        receive a SearchIndexLockError; the index write is deferred silently.
+
         Args:
             document: Django Document instance to index
             effective_content: Override document.content for indexing
         """
         self._ensure_open()
-        with self.batch_update(lock_timeout=5.0) as batch:
-            batch.add_or_update(document, effective_content)
+        try:
+            with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+                batch.add_or_update(document, effective_content)
+        except SearchIndexLockError:
+            logger.error(
+                "Search index lock exhausted for document %d after %d attempts; "
+                "scheduling deferred index write",
+                document.pk,
+                _LOCK_RETRY_ATTEMPTS,
+            )
+            from documents.tasks import index_document
+
+            index_document.apply_async(args=[document.pk], countdown=60)
 
     def remove(self, doc_id: int) -> None:
         """
@@ -505,12 +543,27 @@ class TantivyBackend:
         Convenience method for single-document removal. For bulk operations,
         use batch_update() context manager for better performance.
 
+        On lock exhaustion after all retry attempts, schedules a deferred
+        remove_document_from_index Celery task and returns normally.
+        Callers will NOT receive a SearchIndexLockError.
+
         Args:
             doc_id: Primary key of the document to remove
         """
         self._ensure_open()
-        with self.batch_update(lock_timeout=5.0) as batch:
-            batch.remove(doc_id)
+        try:
+            with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+                batch.remove(doc_id)
+        except SearchIndexLockError:
+            logger.error(
+                "Search index lock exhausted for doc_id %d after %d attempts; "
+                "scheduling deferred index removal",
+                doc_id,
+                _LOCK_RETRY_ATTEMPTS,
+            )
+            from documents.tasks import remove_document_from_index
+
+            remove_document_from_index.apply_async(args=[doc_id], countdown=60)
 
     def highlight_hits(
         self,
index 8f346e36c400fb158c217e273772f904efef4202..3005319b04a83b320bd873a6e7a2688f7a71411a 100644 (file)
@@ -56,6 +56,7 @@ from documents.plugins.base import StopConsumeTaskError
 from documents.plugins.helpers import ProgressManager
 from documents.plugins.helpers import ProgressStatusOptions
 from documents.sanity_checker import SanityCheckFailedException
+from documents.search._backend import SearchIndexLockError
 from documents.signals import document_updated
 from documents.signals.handlers import cleanup_document_deletion
 from documents.signals.handlers import run_workflows
@@ -84,6 +85,63 @@ def index_optimize() -> None:
     )
 
 
+@shared_task(
+    bind=True,
+    ignore_result=True,
+    autoretry_for=(SearchIndexLockError,),
+    max_retries=5,
+    retry_backoff=60,
+    retry_jitter=True,
+)
+def index_document(self, document_id: int) -> None:
+    """
+    Deferred single-document index write.
+
+    Used as a self-healing fallback when add_or_update() exhausts its lock retry
+    budget during high-concurrency consumption. Runs via batch_update() directly
+    to avoid re-entering the deferred scheduling path in add_or_update().
+
+    If the document was deleted before this task runs, it exits cleanly.
+    """
+    from documents.search import get_backend
+
+    try:
+        document = Document.objects.get(pk=document_id)
+    except Document.DoesNotExist:
+        logger.info(
+            "index_document: document %d no longer exists; skipping",
+            document_id,
+        )
+        return
+    with get_backend().batch_update() as batch:
+        batch.add_or_update(
+            document,
+            effective_content=document.get_effective_content(),
+        )
+
+
+@shared_task(
+    bind=True,
+    ignore_result=True,
+    autoretry_for=(SearchIndexLockError,),
+    max_retries=5,
+    retry_backoff=60,
+    retry_jitter=True,
+)
+def remove_document_from_index(self, doc_id: int) -> None:
+    """
+    Deferred single-document index removal.
+
+    Used as a self-healing fallback when remove() exhausts its lock retry budget.
+    Operates only on the Tantivy index; no database lookup required.
+    If the document has already been removed, the term-query delete is a no-op.
+    """
+    from documents.search import get_backend
+
+    with get_backend().batch_update() as batch:
+        batch.remove(doc_id)
+
+
 @shared_task
 def train_classifier(
     *,
diff --git a/src/documents/tests/search/test_lock_backoff.py b/src/documents/tests/search/test_lock_backoff.py
new file mode 100644 (file)
index 0000000..936bd7f
--- /dev/null
@@ -0,0 +1,248 @@
+"""Tests for search index lock backoff, retry logic, and self-healing deferred tasks."""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+import filelock
+import pytest
+
+from documents.search._backend import _LOCK_BACKOFF_CAP
+from documents.search._backend import _LOCK_RETRY_ATTEMPTS
+from documents.search._backend import _LOCK_TIMEOUT_SECONDS
+from documents.search._backend import SearchIndexLockError
+from documents.search._backend import TantivyBackend
+from documents.tasks import index_document
+from documents.tasks import remove_document_from_index
+from documents.tests.factories import DocumentFactory
+
+if TYPE_CHECKING:
+    from collections.abc import Generator
+    from pathlib import Path
+
+    from pytest_mock import MockerFixture
+
+pytestmark = pytest.mark.search
+
+
+@pytest.fixture
+def disk_backend(tmp_path: Path) -> Generator[TantivyBackend, None, None]:
+    """On-disk TantivyBackend so the file-lock code path is exercised."""
+    b = TantivyBackend(path=tmp_path)
+    b.open()
+    try:
+        yield b
+    finally:
+        b.close()
+
+
+class TestWriteBatchLockRetry:
+    """Test WriteBatch retry loop with backoff + full jitter."""
+
+    @pytest.mark.django_db
+    def test_lock_retries_then_succeeds(
+        self,
+        disk_backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """Timeout on first 3 attempts then success on 4th — document must be indexed."""
+        doc = DocumentFactory()
+
+        acquire_calls = 0
+
+        def flaky_acquire(timeout: float) -> None:
+            nonlocal acquire_calls
+            acquire_calls += 1
+            # Raise Timeout for first _LOCK_RETRY_ATTEMPTS - 1 calls, succeed on last
+            if acquire_calls < _LOCK_RETRY_ATTEMPTS:
+                raise filelock.Timeout("")
+
+        sleep_values: list[float] = []
+
+        mocker.patch(
+            "documents.search._backend.filelock.FileLock.acquire",
+            side_effect=flaky_acquire,
+        )
+        mock_sleep = mocker.patch(
+            "documents.search._backend.time.sleep",
+            side_effect=lambda s: sleep_values.append(s),
+        )
+
+        # Should not raise — 4th attempt succeeds
+        with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+            batch.add_or_update(doc)
+
+        # sleep called exactly _LOCK_RETRY_ATTEMPTS - 1 times (once per failed attempt)
+        assert mock_sleep.call_count == _LOCK_RETRY_ATTEMPTS - 1
+
+        # All sleep values must be in [0, _LOCK_BACKOFF_CAP]
+        for s in sleep_values:
+            assert 0 <= s <= _LOCK_BACKOFF_CAP, (
+                f"Sleep value {s} outside [0, {_LOCK_BACKOFF_CAP}]"
+            )
+
+    def test_lock_exhaustion_raises_search_index_lock_error(
+        self,
+        disk_backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """All acquire attempts raise Timeout — WriteBatch must raise SearchIndexLockError."""
+        mocker.patch(
+            "documents.search._backend.filelock.FileLock.acquire",
+            side_effect=filelock.Timeout(""),
+        )
+        mocker.patch("documents.search._backend.time.sleep")
+
+        with pytest.raises(SearchIndexLockError):
+            with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
+                pass
+
+    def test_jitter_values_in_range(
+        self,
+        disk_backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """Sleep values must always lie in [0, _LOCK_BACKOFF_CAP] across many samples."""
+        mocker.patch(
+            "documents.search._backend.filelock.FileLock.acquire",
+            side_effect=filelock.Timeout(""),
+        )
+        sleep_values: list[float] = []
+        mocker.patch(
+            "documents.search._backend.time.sleep",
+            side_effect=lambda s: sleep_values.append(s),
+        )
+        for _ in range(50):
+            sleep_values.clear()
+            with pytest.raises(SearchIndexLockError):
+                with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
+                    pass
+
+            for s in sleep_values:
+                assert 0 <= s <= _LOCK_BACKOFF_CAP, (
+                    f"Jitter {s} exceeds cap {_LOCK_BACKOFF_CAP}"
+                )
+
+
+class TestAddOrUpdateDeferredScheduling:
+    """Test that add_or_update() and remove() defer to Celery on lock exhaustion."""
+
+    @pytest.mark.django_db
+    def test_lock_exhaustion_schedules_deferred_task(
+        self,
+        disk_backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """Lock exhaustion in add_or_update must schedule index_document task, not raise."""
+        doc = DocumentFactory()
+
+        mocker.patch(
+            "documents.search._backend.filelock.FileLock.acquire",
+            side_effect=filelock.Timeout(""),
+        )
+        mocker.patch("documents.search._backend.time.sleep")
+        mock_apply = mocker.patch("documents.tasks.index_document.apply_async")
+
+        # Must NOT raise
+        disk_backend.add_or_update(doc)
+
+        mock_apply.assert_called_once_with(args=[doc.pk], countdown=60)
+
+    def test_remove_exhaustion_schedules_deferred_task(
+        self,
+        disk_backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """Lock exhaustion in remove() must schedule remove_document_from_index task, not raise."""
+        doc_id = 503
+
+        mocker.patch(
+            "documents.search._backend.filelock.FileLock.acquire",
+            side_effect=filelock.Timeout(""),
+        )
+        mocker.patch("documents.search._backend.time.sleep")
+        mock_apply = mocker.patch(
+            "documents.tasks.remove_document_from_index.apply_async",
+        )
+
+        # Must NOT raise
+        disk_backend.remove(doc_id)
+
+        mock_apply.assert_called_once_with(args=[doc_id], countdown=60)
+
+
+@pytest.mark.django_db
+class TestIndexDocumentTask:
+    """Test the deferred index_document and remove_document_from_index Celery tasks."""
+
+    def test_index_document_task_skips_deleted_document(
+        self,
+        caplog: pytest.LogCaptureFixture,
+    ) -> None:
+        """index_document with a non-existent doc_id must return cleanly and log INFO."""
+        nonexistent_id = 999999
+
+        with caplog.at_level(logging.INFO, logger="paperless.tasks"):
+            index_document(nonexistent_id)
+
+        assert any("no longer exists" in record.message for record in caplog.records), (
+            "Expected INFO log about missing document"
+        )
+
+    def test_index_document_task_indexes_existing_document(
+        self,
+        backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """index_document task must add the document to the index via batch_update."""
+        doc = DocumentFactory(content="via deferred task")
+
+        # get_backend is imported lazily inside the task: `from documents.search import get_backend`
+        mocker.patch(
+            "documents.search.get_backend",
+            return_value=backend,
+        )
+        index_document(doc.pk)
+
+        ids = backend.search_ids("deferred task", user=None)
+        assert doc.pk in ids
+
+    def test_remove_document_from_index_task_removes_existing_document(
+        self,
+        backend: TantivyBackend,
+        mocker: MockerFixture,
+    ) -> None:
+        """remove_document_from_index task must remove the document from the index."""
+        doc = DocumentFactory(content="will be removed by deferred task")
+        backend.add_or_update(doc)
+        assert doc.pk in backend.search_ids("removed", user=None)
+
+        mocker.patch("documents.search.get_backend", return_value=backend)
+        remove_document_from_index(doc.pk)
+
+        assert doc.pk not in backend.search_ids("removed", user=None)
+
+    def test_task_does_not_swallow_lock_error(
+        self,
+        mocker: MockerFixture,
+    ) -> None:
+        """Verifies the task body propagates SearchIndexLockError so Celery's
+        autoretry_for can catch it (rather than the task swallowing the error
+        and silently succeeding)."""
+        doc = DocumentFactory()
+
+        mock_batch = mocker.MagicMock()
+        mock_batch.__enter__ = mocker.MagicMock(
+            side_effect=SearchIndexLockError("exhausted"),
+        )
+        mock_batch.__exit__ = mocker.MagicMock(return_value=False)
+
+        mock_backend = mocker.MagicMock()
+        mock_backend.batch_update.return_value = mock_batch
+
+        # get_backend is imported lazily inside the task: `from documents.search import get_backend`
+        mocker.patch("documents.search.get_backend", return_value=mock_backend)
+
+        with pytest.raises(SearchIndexLockError):
+            index_document(doc.pk)