from __future__ import annotations
import logging
+import random
import re
import threading
+import time
from datetime import UTC
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING
+from typing import Final
from typing import Self
from typing import TypedDict
from typing import TypeVar
logger = logging.getLogger("paperless.search")
+_LOCK_TIMEOUT_SECONDS: Final[float] = 10.0 # per-attempt acquire timeout
+_LOCK_RETRY_ATTEMPTS: Final[int] = 4 # total attempts (1 initial + 3 retries)
+_LOCK_BACKOFF_BASE: Final[float] = 1.0 # seconds
+_LOCK_BACKOFF_CAP: Final[float] = 10.0 # seconds
+
_WORD_RE = regex.compile(r"\w+")
_AUTOCOMPLETE_REGEX_TIMEOUT = 1.0 # seconds; guards against ReDoS on untrusted content
if self._backend._path is not None:
lock_path = self._backend._path / ".tantivy.lock"
self._lock = filelock.FileLock(str(lock_path))
- try:
- self._lock.acquire(timeout=self._lock_timeout)
- except filelock.Timeout as e: # pragma: no cover
- raise SearchIndexLockError(
- f"Could not acquire index lock within {self._lock_timeout}s",
- ) from e
+ for attempt in range(_LOCK_RETRY_ATTEMPTS):
+ try:
+ self._lock.acquire(timeout=self._lock_timeout)
+ break
+ except filelock.Timeout:
+ if attempt == _LOCK_RETRY_ATTEMPTS - 1:
+ raise SearchIndexLockError(
+ f"Could not acquire index lock after {_LOCK_RETRY_ATTEMPTS} "
+ f"attempts (timeout={self._lock_timeout}s each)",
+ )
+ sleep_s = random.uniform(
+ 0,
+ min(_LOCK_BACKOFF_CAP, _LOCK_BACKOFF_BASE * (2**attempt)),
+ )
+ logger.debug(
+ "Index lock contention; retrying in %.2fs (attempt %d/%d)",
+ sleep_s,
+ attempt + 1,
+ _LOCK_RETRY_ATTEMPTS,
+ )
+ time.sleep(sleep_s)
self._raw_writer = self._backend._index.writer()
return self
Convenience method for single-document updates. For bulk operations,
use batch_update() context manager for better performance.
+ On lock exhaustion after all retry attempts, schedules a deferred
+ index_document Celery task and returns normally. Callers will NOT
+ receive a SearchIndexLockError; the index write is deferred silently.
+
Args:
document: Django Document instance to index
effective_content: Override document.content for indexing
"""
self._ensure_open()
- with self.batch_update(lock_timeout=5.0) as batch:
- batch.add_or_update(document, effective_content)
+ try:
+ with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+ batch.add_or_update(document, effective_content)
+ except SearchIndexLockError:
+ logger.error(
+ "Search index lock exhausted for document %d after %d attempts; "
+ "scheduling deferred index write",
+ document.pk,
+ _LOCK_RETRY_ATTEMPTS,
+ )
+ from documents.tasks import index_document
+
+ index_document.apply_async(args=[document.pk], countdown=60)
def remove(self, doc_id: int) -> None:
"""
Convenience method for single-document removal. For bulk operations,
use batch_update() context manager for better performance.
+ On lock exhaustion after all retry attempts, schedules a deferred
+ remove_document_from_index Celery task and returns normally.
+ Callers will NOT receive a SearchIndexLockError.
+
Args:
doc_id: Primary key of the document to remove
"""
self._ensure_open()
- with self.batch_update(lock_timeout=5.0) as batch:
- batch.remove(doc_id)
+ try:
+ with self.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+ batch.remove(doc_id)
+ except SearchIndexLockError:
+ logger.error(
+ "Search index lock exhausted for doc_id %d after %d attempts; "
+ "scheduling deferred index removal",
+ doc_id,
+ _LOCK_RETRY_ATTEMPTS,
+ )
+ from documents.tasks import remove_document_from_index
+
+ remove_document_from_index.apply_async(args=[doc_id], countdown=60)
def highlight_hits(
self,
from documents.plugins.helpers import ProgressManager
from documents.plugins.helpers import ProgressStatusOptions
from documents.sanity_checker import SanityCheckFailedException
+from documents.search._backend import SearchIndexLockError
from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion
from documents.signals.handlers import run_workflows
)
+@shared_task(
+ bind=True,
+ ignore_result=True,
+ autoretry_for=(SearchIndexLockError,),
+ max_retries=5,
+ retry_backoff=60,
+ retry_jitter=True,
+)
+def index_document(self, document_id: int) -> None:
+ """
+ Deferred single-document index write.
+
+ Used as a self-healing fallback when add_or_update() exhausts its lock retry
+ budget during high-concurrency consumption. Runs via batch_update() directly
+ to avoid re-entering the deferred scheduling path in add_or_update().
+
+ If the document was deleted before this task runs, it exits cleanly.
+ """
+ from documents.search import get_backend
+
+ try:
+ document = Document.objects.get(pk=document_id)
+ except Document.DoesNotExist:
+ logger.info(
+ "index_document: document %d no longer exists; skipping",
+ document_id,
+ )
+ return
+ with get_backend().batch_update() as batch:
+ batch.add_or_update(
+ document,
+ effective_content=document.get_effective_content(),
+ )
+
+
+@shared_task(
+ bind=True,
+ ignore_result=True,
+ autoretry_for=(SearchIndexLockError,),
+ max_retries=5,
+ retry_backoff=60,
+ retry_jitter=True,
+)
+def remove_document_from_index(self, doc_id: int) -> None:
+ """
+ Deferred single-document index removal.
+
+ Used as a self-healing fallback when remove() exhausts its lock retry budget.
+ Operates only on the Tantivy index; no database lookup required.
+ If the document has already been removed, the term-query delete is a no-op.
+ """
+ from documents.search import get_backend
+
+ with get_backend().batch_update() as batch:
+ batch.remove(doc_id)
+
+
@shared_task
def train_classifier(
*,
--- /dev/null
+"""Tests for search index lock backoff, retry logic, and self-healing deferred tasks."""
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+import filelock
+import pytest
+
+from documents.search._backend import _LOCK_BACKOFF_CAP
+from documents.search._backend import _LOCK_RETRY_ATTEMPTS
+from documents.search._backend import _LOCK_TIMEOUT_SECONDS
+from documents.search._backend import SearchIndexLockError
+from documents.search._backend import TantivyBackend
+from documents.tasks import index_document
+from documents.tasks import remove_document_from_index
+from documents.tests.factories import DocumentFactory
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+ from pathlib import Path
+
+ from pytest_mock import MockerFixture
+
+pytestmark = pytest.mark.search
+
+
+@pytest.fixture
+def disk_backend(tmp_path: Path) -> Generator[TantivyBackend, None, None]:
+ """On-disk TantivyBackend so the file-lock code path is exercised."""
+ b = TantivyBackend(path=tmp_path)
+ b.open()
+ try:
+ yield b
+ finally:
+ b.close()
+
+
+class TestWriteBatchLockRetry:
+ """Test WriteBatch retry loop with backoff + full jitter."""
+
+ @pytest.mark.django_db
+ def test_lock_retries_then_succeeds(
+ self,
+ disk_backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """Timeout on first 3 attempts then success on 4th — document must be indexed."""
+ doc = DocumentFactory()
+
+ acquire_calls = 0
+
+ def flaky_acquire(timeout: float) -> None:
+ nonlocal acquire_calls
+ acquire_calls += 1
+ # Raise Timeout for first _LOCK_RETRY_ATTEMPTS - 1 calls, succeed on last
+ if acquire_calls < _LOCK_RETRY_ATTEMPTS:
+ raise filelock.Timeout("")
+
+ sleep_values: list[float] = []
+
+ mocker.patch(
+ "documents.search._backend.filelock.FileLock.acquire",
+ side_effect=flaky_acquire,
+ )
+ mock_sleep = mocker.patch(
+ "documents.search._backend.time.sleep",
+ side_effect=lambda s: sleep_values.append(s),
+ )
+
+ # Should not raise — 4th attempt succeeds
+ with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS) as batch:
+ batch.add_or_update(doc)
+
+ # sleep called exactly _LOCK_RETRY_ATTEMPTS - 1 times (once per failed attempt)
+ assert mock_sleep.call_count == _LOCK_RETRY_ATTEMPTS - 1
+
+ # All sleep values must be in [0, _LOCK_BACKOFF_CAP]
+ for s in sleep_values:
+ assert 0 <= s <= _LOCK_BACKOFF_CAP, (
+ f"Sleep value {s} outside [0, {_LOCK_BACKOFF_CAP}]"
+ )
+
+ def test_lock_exhaustion_raises_search_index_lock_error(
+ self,
+ disk_backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """All acquire attempts raise Timeout — WriteBatch must raise SearchIndexLockError."""
+ mocker.patch(
+ "documents.search._backend.filelock.FileLock.acquire",
+ side_effect=filelock.Timeout(""),
+ )
+ mocker.patch("documents.search._backend.time.sleep")
+
+ with pytest.raises(SearchIndexLockError):
+ with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
+ pass
+
+ def test_jitter_values_in_range(
+ self,
+ disk_backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """Sleep values must always lie in [0, _LOCK_BACKOFF_CAP] across many samples."""
+ mocker.patch(
+ "documents.search._backend.filelock.FileLock.acquire",
+ side_effect=filelock.Timeout(""),
+ )
+ sleep_values: list[float] = []
+ mocker.patch(
+ "documents.search._backend.time.sleep",
+ side_effect=lambda s: sleep_values.append(s),
+ )
+ for _ in range(50):
+ sleep_values.clear()
+ with pytest.raises(SearchIndexLockError):
+ with disk_backend.batch_update(lock_timeout=_LOCK_TIMEOUT_SECONDS):
+ pass
+
+ for s in sleep_values:
+ assert 0 <= s <= _LOCK_BACKOFF_CAP, (
+ f"Jitter {s} exceeds cap {_LOCK_BACKOFF_CAP}"
+ )
+
+
+class TestAddOrUpdateDeferredScheduling:
+ """Test that add_or_update() and remove() defer to Celery on lock exhaustion."""
+
+ @pytest.mark.django_db
+ def test_lock_exhaustion_schedules_deferred_task(
+ self,
+ disk_backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """Lock exhaustion in add_or_update must schedule index_document task, not raise."""
+ doc = DocumentFactory()
+
+ mocker.patch(
+ "documents.search._backend.filelock.FileLock.acquire",
+ side_effect=filelock.Timeout(""),
+ )
+ mocker.patch("documents.search._backend.time.sleep")
+ mock_apply = mocker.patch("documents.tasks.index_document.apply_async")
+
+ # Must NOT raise
+ disk_backend.add_or_update(doc)
+
+ mock_apply.assert_called_once_with(args=[doc.pk], countdown=60)
+
+ def test_remove_exhaustion_schedules_deferred_task(
+ self,
+ disk_backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """Lock exhaustion in remove() must schedule remove_document_from_index task, not raise."""
+ doc_id = 503
+
+ mocker.patch(
+ "documents.search._backend.filelock.FileLock.acquire",
+ side_effect=filelock.Timeout(""),
+ )
+ mocker.patch("documents.search._backend.time.sleep")
+ mock_apply = mocker.patch(
+ "documents.tasks.remove_document_from_index.apply_async",
+ )
+
+ # Must NOT raise
+ disk_backend.remove(doc_id)
+
+ mock_apply.assert_called_once_with(args=[doc_id], countdown=60)
+
+
+@pytest.mark.django_db
+class TestIndexDocumentTask:
+ """Test the deferred index_document and remove_document_from_index Celery tasks."""
+
+ def test_index_document_task_skips_deleted_document(
+ self,
+ caplog: pytest.LogCaptureFixture,
+ ) -> None:
+ """index_document with a non-existent doc_id must return cleanly and log INFO."""
+ nonexistent_id = 999999
+
+ with caplog.at_level(logging.INFO, logger="paperless.tasks"):
+ index_document(nonexistent_id)
+
+ assert any("no longer exists" in record.message for record in caplog.records), (
+ "Expected INFO log about missing document"
+ )
+
+ def test_index_document_task_indexes_existing_document(
+ self,
+ backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """index_document task must add the document to the index via batch_update."""
+ doc = DocumentFactory(content="via deferred task")
+
+ # get_backend is imported lazily inside the task: `from documents.search import get_backend`
+ mocker.patch(
+ "documents.search.get_backend",
+ return_value=backend,
+ )
+ index_document(doc.pk)
+
+ ids = backend.search_ids("deferred task", user=None)
+ assert doc.pk in ids
+
+ def test_remove_document_from_index_task_removes_existing_document(
+ self,
+ backend: TantivyBackend,
+ mocker: MockerFixture,
+ ) -> None:
+ """remove_document_from_index task must remove the document from the index."""
+ doc = DocumentFactory(content="will be removed by deferred task")
+ backend.add_or_update(doc)
+ assert doc.pk in backend.search_ids("removed", user=None)
+
+ mocker.patch("documents.search.get_backend", return_value=backend)
+ remove_document_from_index(doc.pk)
+
+ assert doc.pk not in backend.search_ids("removed", user=None)
+
+ def test_task_does_not_swallow_lock_error(
+ self,
+ mocker: MockerFixture,
+ ) -> None:
+ """Verifies the task body propagates SearchIndexLockError so Celery's
+ autoretry_for can catch it (rather than the task swallowing the error
+ and silently succeeding)."""
+ doc = DocumentFactory()
+
+ mock_batch = mocker.MagicMock()
+ mock_batch.__enter__ = mocker.MagicMock(
+ side_effect=SearchIndexLockError("exhausted"),
+ )
+ mock_batch.__exit__ = mocker.MagicMock(return_value=False)
+
+ mock_backend = mocker.MagicMock()
+ mock_backend.batch_update.return_value = mock_batch
+
+ # get_backend is imported lazily inside the task: `from documents.search import get_backend`
+ mocker.patch("documents.search.get_backend", return_value=mock_backend)
+
+ with pytest.raises(SearchIndexLockError):
+ index_document(doc.pk)