diff --git a/CHANGELOG.md b/CHANGELOG.md index c0ddb08016..3934f38cc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to ### Changed +- ✨(backend) improve indexing command + - checkpoint recovery + - asynchronicity + - admin command trigger - 💄(frontend) improve comments highlights #1961 ## [v4.8.3] - 2026-03-23 diff --git a/Makefile b/Makefile index fd1d10b0e8..aa7449106b 100644 --- a/Makefile +++ b/Makefile @@ -260,7 +260,7 @@ demo: ## flush db then create a demo for load testing purpose .PHONY: demo index: ## index all documents to remote search - @$(MANAGE) index + @$(MANAGE) index $(args) .PHONY: index # Nota bene: Black should come after isort just in case they don't agree... diff --git a/docs/commands/index.md b/docs/commands/index.md new file mode 100644 index 0000000000..5c4f6d8ffc --- /dev/null +++ b/docs/commands/index.md @@ -0,0 +1,63 @@ +# Index Command + +The `index` management command is used to index documents to the remote search indexer. + +## Usage + +### Make Command + +```bash +# Basic usage with defaults +make index + +# With custom parameters +make index args="--batch-size 100 --lower-time-bound 2024-01-01T00:00:00 --upper-time-bound 2026-01-01T00:00:00" + +``` + +### Command line + +```bash +python manage.py index \ + --lower-time-bound "2024-01-01T00:00:00" \ + --upper-time-bound "2024-01-31T23:59:59" \ + --batch-size 200 \ + --async +``` + +### Django Admin + +The command is available in the Django admin interface: + +1. Go to `/admin/run-indexing/`, you arrive at the "Run Indexing Command" page +2. Fill in the form with the desired parameters +3. Click **"Run Indexing Command"** + +## Parameters + +### `--batch-size` +- **type:** Integer +- **default:** `settings.SEARCH_INDEXER_BATCH_SIZE` +- **description:** Number of documents to process per batch. Higher values may improve performance but use more memory. + +### `--lower-time-bound` +- **optional**: true +- **type:** ISO 8601 datetime string +- **default:** `None` +- **description:** Only documents updated after this date will be indexed. + +### `--upper-time-bound` +- **optional**: true +- **type:** ISO 8601 datetime string +- **default:** `None` +- **description:** Only documents updated before this date will be indexed. + +### `--async` +- **type:** Boolean flag +- **default:** `False` ++- **description:** When set, dispatches the indexing job to a Celery worker instead of running it synchronously. + +## Crash Safe Mode + +The command saves the `updated_at` of the last document of each successful batch into the `bulk-indexer-checkpoint` cache variable. +If the process crashes, this value can be used as `lower-time-bound` to resume from the last successfully indexed document. diff --git a/src/backend/core/admin.py b/src/backend/core/admin.py index 3d2b0ef786..b8740a7b79 100644 --- a/src/backend/core/admin.py +++ b/src/backend/core/admin.py @@ -1,15 +1,54 @@ """Admin classes and registrations for core app.""" from django.contrib import admin, messages +from django.contrib.admin.views.decorators import staff_member_required from django.contrib.auth import admin as auth_admin -from django.shortcuts import redirect +from django.core.management import call_command +from django.http import HttpRequest +from django.shortcuts import redirect, render from django.utils.translation import gettext_lazy as _ from treebeard.admin import TreeAdmin from core import models +from core.forms import RunIndexingForm from core.tasks.user_reconciliation import user_reconciliation_csv_import_job +# Customize the default admin site's get_app_list method +_original_get_app_list = admin.site.get_app_list + + +def custom_get_app_list(_self, request, app_label=None): + """Add custom commands to the app list.""" + app_list = _original_get_app_list(request, app_label) + + # Add Commands app with Run Indexing command + commands_app = { + "name": _("Commands"), + "app_label": "commands", + "app_url": "#", + "has_module_perms": True, + "models": [ + { + "name": _("Run indexing"), + "object_name": "RunIndexing", + "admin_url": "/admin/run-indexing/", + "view_only": False, + "add_url": None, + "change_url": None, + } + ], + } + + app_list.append(commands_app) + return app_list + + +# Monkey-patch the admin site +admin.site.get_app_list = lambda request, app_label=None: custom_get_app_list( + admin.site, request, app_label +) + @admin.register(models.User) class UserAdmin(auth_admin.UserAdmin): @@ -227,3 +266,39 @@ class InvitationAdmin(admin.ModelAdmin): def save_model(self, request, obj, form, change): obj.issuer = request.user obj.save() + + +@staff_member_required +def run_indexing_view(request: HttpRequest): + """Custom admin view for running indexing commands.""" + if request.method == "POST": + form = RunIndexingForm(request.POST) + if form.is_valid(): + lower_time_bound = form.cleaned_data.get("lower_time_bound") + upper_time_bound = form.cleaned_data.get("upper_time_bound") + call_command( + "index", + batch_size=form.cleaned_data["batch_size"], + lower_time_bound=lower_time_bound.isoformat() + if lower_time_bound + else None, + upper_time_bound=upper_time_bound.isoformat() + if upper_time_bound + else None, + async_mode=True, + ) + messages.success(request, _("Indexing triggered!")) + return redirect("run_indexing") + messages.error(request, _("Please correct the errors below.")) + else: + form = RunIndexingForm() + + return render( + request=request, + template_name="runindexing.html", + context={ + **admin.site.each_context(request), + "title": "Run Indexing Command", + "form": form, + }, + ) diff --git a/src/backend/core/factories.py b/src/backend/core/factories.py index acc39e06bd..adde38042d 100644 --- a/src/backend/core/factories.py +++ b/src/backend/core/factories.py @@ -6,6 +6,7 @@ from django.contrib.auth.hashers import make_password import factory.fuzzy +from factory import post_generation from faker import Faker from core import models @@ -159,6 +160,20 @@ def masked_by(self, create, extracted, **kwargs): document=self, user=item, defaults={"is_masked": True} ) + @post_generation + def updated_at(self, create, extracted, **kwargs): + """ + the BaseModel.updated_at has auto_now=True. + This prevents setting a specific updated_at value with the factory. + + This post_generation method bypasses this behavior. + """ + if not create or not extracted: + return + + self.__class__.objects.filter(pk=self.pk).update(updated_at=extracted) + self.refresh_from_db() + class UserDocumentAccessFactory(factory.django.DjangoModelFactory): """Create fake document user accesses for testing.""" diff --git a/src/backend/core/forms.py b/src/backend/core/forms.py new file mode 100644 index 0000000000..00ff2500a8 --- /dev/null +++ b/src/backend/core/forms.py @@ -0,0 +1,42 @@ +"""Forms for the core app.""" + +from django import forms +from django.conf import settings +from django.utils.translation import gettext_lazy as _ + + +class RunIndexingForm(forms.Form): + """ + Form for running the indexing process. + """ + + batch_size = forms.IntegerField( + min_value=1, + initial=settings.SEARCH_INDEXER_BATCH_SIZE, + ) + lower_time_bound = forms.DateTimeField( + required=False, widget=forms.TextInput(attrs={"type": "datetime-local"}) + ) + upper_time_bound = forms.DateTimeField( + required=False, widget=forms.TextInput(attrs={"type": "datetime-local"}) + ) + + def clean(self): + """Override clean to validate time bounds.""" + cleaned_data = super().clean() + self.check_time_bounds() + return cleaned_data + + def check_time_bounds(self): + """Validate that lower_time_bound is before upper_time_bound.""" + lower_time_bound = self.cleaned_data.get("lower_time_bound") + upper_time_bound = self.cleaned_data.get("upper_time_bound") + if ( + lower_time_bound + and upper_time_bound + and lower_time_bound > upper_time_bound + ): + self.add_error( + "upper_time_bound", + _("Upper time bound must be after lower time bound."), + ) diff --git a/src/backend/core/management/commands/index.py b/src/backend/core/management/commands/index.py index af046e0608..63c6b08e35 100644 --- a/src/backend/core/management/commands/index.py +++ b/src/backend/core/management/commands/index.py @@ -4,12 +4,16 @@ import logging import time +from datetime import datetime +from django.conf import settings from django.core.management.base import BaseCommand, CommandError +from core import models from core.services.search_indexers import get_document_indexer +from core.tasks.search import batch_document_indexer_task -logger = logging.getLogger("docs.search.bootstrap_search") +logger = logging.getLogger(__name__) class Command(BaseCommand): @@ -24,9 +28,32 @@ def add_arguments(self, parser): action="store", dest="batch_size", type=int, - default=50, + default=settings.SEARCH_INDEXER_BATCH_SIZE, help="Indexation query batch size", ) + parser.add_argument( + "--lower-time-bound", + action="store", + dest="lower_time_bound", + type=datetime.fromisoformat, + default=None, + help="DateTime in ISO format. Only documents updated after this date will be indexed", + ) + parser.add_argument( + "--upper-time-bound", + action="store", + dest="upper_time_bound", + type=datetime.fromisoformat, + default=None, + help="DateTime in ISO format. Only documents updated before this date will be indexed", + ) + parser.add_argument( + "--async", + action="store_true", + dest="async_mode", + default=False, + help="Whether to execute indexing asynchronously in a Celery task (default: False)", + ) def handle(self, *args, **options): """Launch and log search index generation.""" @@ -35,18 +62,38 @@ def handle(self, *args, **options): if not indexer: raise CommandError("The indexer is not enabled or properly configured.") - logger.info("Starting to regenerate Find index...") - start = time.perf_counter() - batch_size = options["batch_size"] + if options["async_mode"]: + try: + batch_document_indexer_task.apply_async( + kwargs={ + "lower_time_bound": options["lower_time_bound"], + "upper_time_bound": options["upper_time_bound"], + "batch_size": options["batch_size"], + "crash_safe_mode": True, + }, + ) + except Exception as err: + raise CommandError("Unable to dispatch indexing task") from err + logger.info("Document indexing task sent to worker") + else: + logger.info("Starting to regenerate Find index...") + start = time.perf_counter() - try: - count = indexer.index(batch_size=batch_size) - except Exception as err: - raise CommandError("Unable to regenerate index") from err + try: + count = indexer.index( + queryset=models.Document.objects.filter_updated_at( + lower_time_bound=options["lower_time_bound"], + upper_time_bound=options["upper_time_bound"], + ), + batch_size=options["batch_size"], + crash_safe_mode=True, + ) + except Exception as err: + raise CommandError("Unable to regenerate index") from err - duration = time.perf_counter() - start - logger.info( - "Search index regenerated from %d document(s) in %.2f seconds.", - count, - duration, - ) + duration = time.perf_counter() - start + logger.info( + "Search index regenerated from %d document(s) in %.2f seconds.", + count, + duration, + ) diff --git a/src/backend/core/models.py b/src/backend/core/models.py index d1869e1323..0d8c23038d 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -859,6 +859,32 @@ def annotate_user_roles(self, user): user_roles=models.Value([], output_field=output_field), ) + def filter_updated_at(self, lower_time_bound=None, upper_time_bound=None): + """ + Filter documents by update_at. + + Args: + lower_time_bound (datetime, optional): + Keep documents updated after this timestamp. + upper_time_bound (datetime, optional): + Keep documents updated before this timestamp. + + Returns: + QuerySet: Filtered queryset ready for indexation. + """ + conditions = models.Q() + if lower_time_bound and upper_time_bound: + conditions = models.Q( + updated_at__gte=lower_time_bound, + updated_at__lte=upper_time_bound, + ) + elif lower_time_bound: + conditions = models.Q(updated_at__gte=lower_time_bound) + elif upper_time_bound: + conditions = models.Q(updated_at__lte=upper_time_bound) + + return self.filter(conditions) + class DocumentManager(MP_NodeManager.from_queryset(DocumentQuerySet)): """ @@ -1459,13 +1485,16 @@ def restore(self): .first() ) self.ancestors_deleted_at = ancestors_deleted_at - self.save(update_fields=["deleted_at", "ancestors_deleted_at"]) + self.save(update_fields=["deleted_at", "ancestors_deleted_at", "updated_at"]) self.invalidate_nb_accesses_cache() self.get_descendants().exclude( models.Q(deleted_at__isnull=False) | models.Q(ancestors_deleted_at__lt=current_deleted_at) - ).update(ancestors_deleted_at=self.ancestors_deleted_at) + ).update( + ancestors_deleted_at=self.ancestors_deleted_at, + updated_at=self.updated_at, + ) if self.depth > 1: self._meta.model.objects.filter(pk=self.get_parent().pk).update( diff --git a/src/backend/core/services/search_indexers.py b/src/backend/core/services/search_indexers.py index d2bf9ed064..f3d61fd3eb 100644 --- a/src/backend/core/services/search_indexers.py +++ b/src/backend/core/services/search_indexers.py @@ -1,5 +1,6 @@ """Document search index management utilities and indexers""" +import itertools import logging from abc import ABC, abstractmethod from collections import defaultdict @@ -125,44 +126,44 @@ def __init__(self): if not self.search_url: raise ImproperlyConfigured("SEARCH_URL must be set in Django settings.") - def index(self, queryset=None, batch_size=None): + def index(self, queryset, batch_size=None, crash_safe_mode=False): """ Fetch documents in batches, serialize them, and push to the search backend. Args: - queryset (optional): Document queryset - Defaults to all documents without filter. + queryset: Document queryset batch_size (int, optional): Number of documents per batch. Defaults to settings.SEARCH_INDEXER_BATCH_SIZE. + crash_safe_mode (bool, optional): If True, order documents by updated_at + This allows resuming indexing from the last successful batch in case of a crash + but is more computationally expensive due to sorting. """ - last_id = 0 count = 0 - queryset = queryset or models.Document.objects.all() batch_size = batch_size or self.batch_size - while True: - documents_batch = list( - queryset.filter( - id__gt=last_id, - ).order_by("id")[:batch_size] - ) - - if not documents_batch: - break + if crash_safe_mode: + queryset = queryset.order_by("updated_at") + for documents_batch in itertools.batched(queryset.iterator(), batch_size): doc_paths = [doc.path for doc in documents_batch] - last_id = documents_batch[-1].id accesses_by_document_path = get_batch_accesses_by_users_and_teams(doc_paths) - serialized_batch = [ self.serialize_document(document, accesses_by_document_path) for document in documents_batch if document.content or document.title ] - if serialized_batch: - self.push(serialized_batch) - count += len(serialized_batch) + if not serialized_batch: + continue + + self.push(serialized_batch) + count += len(serialized_batch) + + if crash_safe_mode: + logger.info( + "Indexing checkpoint: %s.", + serialized_batch[-1]["updated_at"], + ) return count diff --git a/src/backend/core/tasks/search.py b/src/backend/core/tasks/search.py index e1c39e6bea..61e03f2aef 100644 --- a/src/backend/core/tasks/search.py +++ b/src/backend/core/tasks/search.py @@ -4,7 +4,6 @@ from django.conf import settings from django.core.cache import cache -from django.db.models import Q from django_redis.cache import RedisCache @@ -20,7 +19,12 @@ @app.task def document_indexer_task(document_id): - """Celery Task : Sends indexation query for a document.""" + """ + Celery Task: Indexes a single document by its ID. + + Args: + document_id: Primary key of the document to index. + """ indexer = get_document_indexer() if indexer: @@ -30,8 +34,17 @@ def document_indexer_task(document_id): def batch_indexer_throttle_acquire(timeout: int = 0, atomic: bool = True): """ - Enable the task throttle flag for a delay. - Uses redis locks if available to ensure atomic changes + Acquire a throttle lock to prevent multiple batch indexation tasks during countdown. + + implements a debouncing pattern: only the first call during the timeout period + will succeed, subsequent calls are skipped until the timeout expires. + + Args: + timeout (int): Lock duration in seconds (countdown period). + atomic (bool): Use Redis locks for atomic operations if available. + + Returns: + bool: True if lock acquired (first call), False if already held (subsequent calls). """ key = "document-batch-indexer-throttle" @@ -41,44 +54,65 @@ def batch_indexer_throttle_acquire(timeout: int = 0, atomic: bool = True): with cache.locks(key): return batch_indexer_throttle_acquire(timeout, atomic=False) - # Use add() here : - # - set the flag and returns true if not exist - # - do nothing and return false if exist + # cache.add() is atomic test-and-set operation: + # - If key doesn't exist: creates it with timeout and returns True + # - If key already exists: does nothing and returns False + # The key expires after timeout seconds, releasing the lock. + # The value 1 is irrelevant, only the key presence/absence matters. return cache.add(key, 1, timeout=timeout) @app.task -def batch_document_indexer_task(timestamp): - """Celery Task : Sends indexation query for a batch of documents.""" +def batch_document_indexer_task(lower_time_bound=None, upper_time_bound=None, **kwargs): + """ + Celery Task: Batch indexes all documents modified since timestamp. + + Args: + lower_time_bound (datetime, optional): + indexes documents updated or deleted after this timestamp. + upper_time_bound (datetime, optional): + indexes documents updated or deleted before this timestamp. + """ indexer = get_document_indexer() - if indexer: - queryset = models.Document.objects.filter( - Q(updated_at__gte=timestamp) - | Q(deleted_at__gte=timestamp) - | Q(ancestors_deleted_at__gte=timestamp) - ) + if not indexer: + logger.warning("Indexing task triggered but no indexer configured: skipping") + return - count = indexer.index(queryset) - logger.info("Indexed %d documents", count) + count = indexer.index( + queryset=models.Document.objects.filter_updated_at( + lower_time_bound=lower_time_bound, upper_time_bound=upper_time_bound + ), + **kwargs, + ) + logger.info("Indexed %d documents", count) def trigger_batch_document_indexer(document): """ - Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting. + Trigger document indexation with optional debounce mechanism. + + behavior depends on SEARCH_INDEXER_COUNTDOWN setting: + - if countdown > 0 sec (async batch mode): + * schedules a batch indexation task after countdown in seconds + * uses throttle mechanism to ensure only ONE batch task runs per countdown period + * all documents modified since first trigger are indexed together + - if countdown == 0 sec (sync mode): + * executes indexation synchronously in the current thread + * no batching, no throttling, no Celery task queuing Args: - document (Document): The document instance. + document (Document): the document instance that triggered the indexation. """ countdown = int(settings.SEARCH_INDEXER_COUNTDOWN) - # DO NOT create a task if indexation if disabled + # DO NOT create a task if indexation is disabled if not settings.SEARCH_INDEXER_CLASS: return if countdown > 0: - # Each time this method is called during a countdown, we increment the - # counter and each task decrease it, so the index be run only once. + # use throttle to ensure only one task is scheduled per countdown period. + # if throttle acquired, schedule batch task; otherwise skip. if batch_indexer_throttle_acquire(timeout=countdown): logger.info( "Add task for batch document indexation from updated_at=%s in %d seconds", @@ -87,7 +121,7 @@ def trigger_batch_document_indexer(document): ) batch_document_indexer_task.apply_async( - args=[document.updated_at], countdown=countdown + kwargs={"lower_time_bound": document.updated_at}, countdown=countdown ) else: logger.info("Skip task for batch document %s indexation", document.pk) diff --git a/src/backend/core/templates/runindexing.html b/src/backend/core/templates/runindexing.html new file mode 100644 index 0000000000..bb2c6c7cfb --- /dev/null +++ b/src/backend/core/templates/runindexing.html @@ -0,0 +1,22 @@ +{% extends "admin/base_site.html" %} +{% load i18n %} + +{% block content %} + +
+ +{% endblock %} diff --git a/src/backend/core/tests/commands/test_index.py b/src/backend/core/tests/commands/test_index.py index 78d3024958..d9adbb2050 100644 --- a/src/backend/core/tests/commands/test_index.py +++ b/src/backend/core/tests/commands/test_index.py @@ -2,21 +2,25 @@ Unit test for `index` command. """ +import logging +from datetime import datetime, timedelta, timezone from operator import itemgetter from unittest import mock +from django.core.cache import cache from django.core.management import CommandError, call_command from django.db import transaction import pytest from core import factories +from core.factories import DocumentFactory from core.services.search_indexers import FindDocumentIndexer @pytest.mark.django_db @pytest.mark.usefixtures("indexer_settings") -def test_index(): +def test_index_without_bound_success(): """Test the command `index` that run the Find app indexer for all the available documents.""" user = factories.UserFactory() indexer = FindDocumentIndexer() @@ -39,18 +43,152 @@ def test_index(): with mock.patch.object(FindDocumentIndexer, "push") as mock_push: call_command("index") - push_call_args = [call.args[0] for call in mock_push.call_args_list] + push_call_args = [call.args[0] for call in mock_push.call_args_list] - # called once but with a batch of docs - mock_push.assert_called_once() + # called once but with a batch of docs + mock_push.assert_called_once() - assert sorted(push_call_args[0], key=itemgetter("id")) == sorted( - [ - indexer.serialize_document(doc, accesses), - indexer.serialize_document(no_title_doc, accesses), - ], - key=itemgetter("id"), + assert sorted(push_call_args[0], key=itemgetter("id")) == sorted( + [ + indexer.serialize_document(doc, accesses), + indexer.serialize_document(no_title_doc, accesses), + ], + key=itemgetter("id"), + ) + + +@pytest.mark.django_db +@pytest.mark.usefixtures("indexer_settings") +def test_index_with_both_bounds_success(): + """Test the command `index` for all documents within time bound.""" + cache.clear() + lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc) + upper_time_bound = lower_time_bound + timedelta(days=30) + + document_too_early = DocumentFactory( + updated_at=lower_time_bound - timedelta(days=10) + ) + document_in_window_1 = DocumentFactory( + updated_at=lower_time_bound + timedelta(days=5) + ) + document_in_window_2 = DocumentFactory( + updated_at=lower_time_bound + timedelta(days=15) + ) + document_too_late = DocumentFactory( + updated_at=upper_time_bound + timedelta(days=10) + ) + + with mock.patch.object(FindDocumentIndexer, "push") as mock_push: + call_command( + "index", + lower_time_bound=lower_time_bound.isoformat(), + upper_time_bound=upper_time_bound.isoformat(), + ) + pushed_document_ids = [ + document["id"] + for call_arg_list in mock_push.call_args_list + for document in call_arg_list.args[0] + ] + + # Only documents in window should be indexed + assert str(document_too_early.id) not in pushed_document_ids + assert str(document_in_window_1.id) in pushed_document_ids + assert str(document_in_window_2.id) in pushed_document_ids + assert str(document_too_late.id) not in pushed_document_ids + + +@pytest.mark.django_db +@pytest.mark.usefixtures("indexer_settings") +def test_index_with_crash_recovery(caplog_with_propagate): + """Test resuming indexing from checkpoint after a crash.""" + cache.clear() + lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc) + upper_time_bound = lower_time_bound + timedelta(days=60) + + batch_size = 2 + documents = [ + # batch 0 + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=5)), + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=10)), + # batch 1 + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=20)), + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=25)), + # batch 2 - will crash here + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=30)), + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=35)), + # batch 3 + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=40)), + factories.DocumentFactory(updated_at=lower_time_bound + timedelta(days=45)), + ] + + def push_with_failure_on_batch_2(data): + # Crash when encounters document at index 4 (batch 2 with batch_size=2) + if str(documents[4].id) in [document["id"] for document in data]: + raise ConnectionError("Simulated indexing error") + + # First run: simulate crash on batch 3 + with mock.patch.object(FindDocumentIndexer, "push") as mock_push: + mock_push.side_effect = push_with_failure_on_batch_2 + with pytest.raises(CommandError): + with caplog_with_propagate.at_level(logging.INFO): + call_command( + "index", + batch_size=batch_size, + lower_time_bound=lower_time_bound.isoformat(), + upper_time_bound=upper_time_bound.isoformat(), + ) + pushed_document_ids = [ + document["id"] + for call_arg_list in mock_push.call_args_list + for document in call_arg_list.args[0] + ] + + # the updated at of the last document of each batch are logged as checkpoint + # -> documents[3].updated_at is the most advanced checkpoint + for i in [1, 3]: + assert any( + f"Indexing checkpoint: {documents[i].updated_at.isoformat()}." in message + for message in caplog_with_propagate.messages + ) + for i in [0, 2, 4, 5, 6]: + assert not any( + f"Indexing checkpoint: {documents[i].updated_at.isoformat()}" in message + for message in caplog_with_propagate.messages + ) + # first 2 batches should be indexed successfully + for i in range(0, 4): + assert str(documents[i].id) in pushed_document_ids + # next batch should have been attempted but failed + for i in range(4, 6): + assert str(documents[i].id) in pushed_document_ids + # last batches indexing should not have been attempted + for i in range(6, 8): + assert str(documents[i].id) not in pushed_document_ids + + # Second run: resume from checkpoint + with mock.patch.object(FindDocumentIndexer, "push") as mock_push: + call_command( + "index", + batch_size=batch_size, + lower_time_bound=documents[3].updated_at, + upper_time_bound=upper_time_bound.isoformat(), ) + pushed_document_ids = [ + document["id"] + for call_arg_list in mock_push.call_args_list + for document in call_arg_list.args[0] + ] + + # first 2 batches should NOT be re-indexed + # except the last document of the last batch which is on the checkpoint boundary + # -> doc 0, 1 and 2 + for i in range(0, 3): + assert str(documents[i].id) not in pushed_document_ids + # next batches should be indexed including the document at the checkpoint boundary + # which has already been indexed and is re-indexed + # -> doc 3 to the end + for i in range(3, 8): + assert str(documents[i].id) in pushed_document_ids @pytest.mark.django_db @@ -63,3 +201,57 @@ def test_index_improperly_configured(indexer_settings): call_command("index") assert str(err.value) == "The indexer is not enabled or properly configured." + + +@pytest.mark.django_db +@pytest.mark.usefixtures("indexer_settings") +def test_index_with_async_flag(settings): + """Test the command `index` with --async=True runs task asynchronously.""" + cache.clear() + lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc) + + with mock.patch( + "core.management.commands.index.batch_document_indexer_task" + ) as mock_task: + with mock.patch.object(FindDocumentIndexer, "push") as mock_push: + call_command( + "index", async_mode=True, lower_time_bound=lower_time_bound.isoformat() + ) + # push not be called synchronously + mock_push.assert_not_called() + # task called asynchronously + mock_task.apply_async.assert_called_once_with( + kwargs={ + "lower_time_bound": lower_time_bound.isoformat(), + "upper_time_bound": None, + "batch_size": settings.SEARCH_INDEXER_BATCH_SIZE, + "crash_safe_mode": True, + } + ) + + +@pytest.mark.django_db +@pytest.mark.usefixtures("indexer_settings") +def test_index_without_async_flag(): + """Test the command `index` with --async=False runs synchronously.""" + cache.clear() + lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc) + + document = DocumentFactory(updated_at=lower_time_bound + timedelta(days=10)) + + with mock.patch( + "core.management.commands.index.batch_document_indexer_task" + ) as mock_task: + with mock.patch.object(FindDocumentIndexer, "push") as mock_push: + call_command( + "index", async_mode=False, lower_time_bound=lower_time_bound.isoformat() + ) + # push is called synchronously to index the document + pushed_document_ids = [ + document["id"] + for call_arg_list in mock_push.call_args_list + for document in call_arg_list.args[0] + ] + assert str(document.id) in pushed_document_ids + # async task not called + mock_task.apply_async.assert_not_called() diff --git a/src/backend/core/tests/conftest.py b/src/backend/core/tests/conftest.py index 0af57d9ff7..2a6525223a 100644 --- a/src/backend/core/tests/conftest.py +++ b/src/backend/core/tests/conftest.py @@ -1,6 +1,7 @@ """Fixtures for tests in the impress core application""" import base64 +import logging from unittest import mock from django.core.cache import cache @@ -22,6 +23,30 @@ def clear_cache(): cache.clear() +@pytest.fixture +def caplog_with_propagate(settings, caplog): + """ + propagate=False on settings.LOGGING loggers. + This prevents caplog from capturing logs. + + This fixture enables propagation on all configured loggers. + """ + # Save original propagate state + original_propagate = {} + + for logger_name in settings.LOGGING.get("loggers", {}): + logger = logging.getLogger(logger_name) + original_propagate[logger_name] = logger.propagate + logger.propagate = True + + try: + yield caplog + finally: + # Restore original propagate states + for logger_name, original_value in original_propagate.items(): + logging.getLogger(logger_name).propagate = original_value + + @pytest.fixture def mock_user_teams(): """Mock for the "teams" property on the User model.""" diff --git a/src/backend/core/tests/documents/test_api_documents_delete.py b/src/backend/core/tests/documents/test_api_documents_delete.py index 776bbe1fdf..c107d28264 100644 --- a/src/backend/core/tests/documents/test_api_documents_delete.py +++ b/src/backend/core/tests/documents/test_api_documents_delete.py @@ -96,8 +96,9 @@ def test_api_documents_delete_authenticated_owner_of_ancestor(depth): ) assert models.Document.objects.count() == depth + document_to_delete = documents[-1] response = client.delete( - f"/api/v1.0/documents/{documents[-1].id}/", + f"/api/v1.0/documents/{document_to_delete.id}/", ) assert response.status_code == 204 @@ -105,7 +106,11 @@ def test_api_documents_delete_authenticated_owner_of_ancestor(depth): # Make sure it is only a soft delete assert models.Document.objects.count() == depth assert models.Document.objects.filter(deleted_at__isnull=True).count() == depth - 1 - assert models.Document.objects.filter(deleted_at__isnull=False).count() == 1 + deleted_documents = models.Document.objects.filter(deleted_at__isnull=False) + assert len(deleted_documents) == 1 + deleted_document = deleted_documents[0] + # updated_at is updated by the soft delete + assert deleted_document.updated_at > document_to_delete.updated_at @pytest.mark.parametrize("via", VIA) diff --git a/src/backend/core/tests/documents/test_api_documents_restore.py b/src/backend/core/tests/documents/test_api_documents_restore.py index 5ae64aec12..56aec6b705 100644 --- a/src/backend/core/tests/documents/test_api_documents_restore.py +++ b/src/backend/core/tests/documents/test_api_documents_restore.py @@ -91,11 +91,15 @@ def test_api_documents_restore_authenticated_owner_ancestor_deleted(): factories.UserDocumentAccessFactory(document=document, user=user, role="owner") document.soft_delete() + document.refresh_from_db() document_deleted_at = document.deleted_at + document_updated_at = document.updated_at + assert document_deleted_at is not None grand_parent.soft_delete() grand_parent_deleted_at = grand_parent.deleted_at + assert grand_parent_deleted_at is not None response = client.post(f"/api/v1.0/documents/{document.id!s}/restore/") @@ -105,6 +109,8 @@ def test_api_documents_restore_authenticated_owner_ancestor_deleted(): document.refresh_from_db() assert document.deleted_at is None + # document is updated by restore + assert document.updated_at > document_updated_at # document is still marked as deleted assert document.ancestors_deleted_at == grand_parent_deleted_at assert grand_parent_deleted_at > document_deleted_at diff --git a/src/backend/core/tests/test_admin_run_indexing.py b/src/backend/core/tests/test_admin_run_indexing.py new file mode 100644 index 0000000000..d378855574 --- /dev/null +++ b/src/backend/core/tests/test_admin_run_indexing.py @@ -0,0 +1,54 @@ +"""Tests for run_indexing_view admin endpoint.""" + +from unittest.mock import patch + +from django.http import HttpResponse + +import pytest + +from core import factories + + +@pytest.mark.usefixtures("indexer_settings") +@pytest.mark.django_db +@pytest.mark.parametrize( + "is_authenticated,is_staff,should_call_command", + [ + (False, False, False), + (True, False, False), + (True, True, True), + ], +) +def test_run_indexing_view_post_authentication( + client, + is_authenticated, + is_staff, + should_call_command, +): + """Test that POST to run_indexing_view requires staff authentication.""" + + if is_authenticated: + user = factories.UserFactory(is_staff=is_staff) + client.force_login(user) + + batch_size = 100 + with patch("core.admin.call_command") as mock_call_command: + mock_call_command.return_value = HttpResponse("Mocked render") + response = client.post("/admin/run-indexing/", {"batch_size": batch_size}) + + # redirects in all cases + assert response.status_code == 302 + + if should_call_command: + assert "/admin/run-indexing/" == response.url + mock_call_command.assert_called_once() + assert mock_call_command.call_args.kwargs == { + "batch_size": batch_size, + "lower_time_bound": None, + "upper_time_bound": None, + "async_mode": True, + } + + else: + assert "/admin/login/" in response.url + mock_call_command.assert_not_called() diff --git a/src/backend/core/tests/test_models_documents.py b/src/backend/core/tests/test_models_documents.py index 82edf82d9a..64a80689cc 100644 --- a/src/backend/core/tests/test_models_documents.py +++ b/src/backend/core/tests/test_models_documents.py @@ -5,6 +5,8 @@ import random import smtplib +from datetime import datetime, timedelta +from datetime import timezone as base_timezone from logging import Logger from unittest import mock @@ -19,6 +21,7 @@ import pytest from core import factories, models +from core.factories import DocumentFactory pytestmark = pytest.mark.django_db @@ -87,7 +90,8 @@ def test_models_documents_tree_alphabet(): @pytest.mark.parametrize("depth", range(5)) def test_models_documents_soft_delete(depth): - """Trying to delete a document that is already deleted or is a descendant of + """ + Trying to delete a document that is already deleted or is a descendant of a deleted document should raise an error. """ documents = [] @@ -99,6 +103,8 @@ def test_models_documents_soft_delete(depth): ) assert models.Document.objects.count() == depth + 1 + document_pk_to_updated_at = {d.pk: d.updated_at for d in documents} + # Delete any one of the documents... deleted_document = random.choice(documents) deleted_document.soft_delete() @@ -106,19 +112,26 @@ def test_models_documents_soft_delete(depth): with pytest.raises(RuntimeError): documents[-1].soft_delete() + deleted_document.refresh_from_db() assert deleted_document.deleted_at is not None assert deleted_document.ancestors_deleted_at == deleted_document.deleted_at + # updated_at is updated on the deleted document + assert deleted_document.updated_at > document_pk_to_updated_at[deleted_document.pk] descendants = deleted_document.get_descendants() for child in descendants: assert child.deleted_at is None assert child.ancestors_deleted_at is not None assert child.ancestors_deleted_at == deleted_document.deleted_at + # updated_at is updated on descendants + assert child.updated_at > document_pk_to_updated_at[child.pk] ancestors = deleted_document.get_ancestors() for parent in ancestors: assert parent.deleted_at is None assert parent.ancestors_deleted_at is None + # updated_at is not affected on parents + assert parent.updated_at == document_pk_to_updated_at[parent.pk] assert len(ancestors) + len(descendants) == depth @@ -1419,16 +1432,20 @@ def test_models_documents_restore_tempering_with_instance(): def test_models_documents_restore(django_assert_num_queries): """The restore method should restore a soft-deleted document.""" document = factories.DocumentFactory() + document.soft_delete() document.refresh_from_db() assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at + original_updated_after_delete = document.updated_at with django_assert_num_queries(10): document.restore() document.refresh_from_db() assert document.deleted_at is None - assert document.ancestors_deleted_at == document.deleted_at + assert document.ancestors_deleted_at is None + # updated_at is updated by restore + assert original_updated_after_delete < document.updated_at def test_models_documents_restore_complex(django_assert_num_queries): @@ -1445,6 +1462,7 @@ def test_models_documents_restore_complex(django_assert_num_queries): document.refresh_from_db() child1.refresh_from_db() child2.refresh_from_db() + assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at assert child1.ancestors_deleted_at == document.deleted_at @@ -1454,13 +1472,18 @@ def test_models_documents_restore_complex(django_assert_num_queries): grand_parent.soft_delete() grand_parent.refresh_from_db() parent.refresh_from_db() + document.refresh_from_db() + child1.refresh_from_db() + child2.refresh_from_db() + grand_parent_updated_at = grand_parent.updated_at + document_updated_at = document.updated_at + child1_updated_at = child2.updated_at + child2_updated_at = child2.updated_at + assert grand_parent.deleted_at is not None assert grand_parent.ancestors_deleted_at == grand_parent.deleted_at assert parent.ancestors_deleted_at == grand_parent.deleted_at # item, child1 and child2 should not be affected - document.refresh_from_db() - child1.refresh_from_db() - child2.refresh_from_db() assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at assert child1.ancestors_deleted_at == document.deleted_at @@ -1469,15 +1492,23 @@ def test_models_documents_restore_complex(django_assert_num_queries): # Restore the item with django_assert_num_queries(14): document.restore() + document.refresh_from_db() child1.refresh_from_db() child2.refresh_from_db() grand_parent.refresh_from_db() + assert document.deleted_at is None assert document.ancestors_deleted_at == grand_parent.deleted_at # child 1 and child 2 should now have the same ancestors_deleted_at as the grand parent assert child1.ancestors_deleted_at == grand_parent.deleted_at assert child2.ancestors_deleted_at == grand_parent.deleted_at + # updated_at is updated for document and children after restore + assert document.updated_at > document_updated_at + assert child1.updated_at > child1_updated_at + assert child2.updated_at > child2_updated_at + # grand_parent updated_at is not affected + assert grand_parent.updated_at == grand_parent_updated_at def test_models_documents_restore_complex_bis(django_assert_num_queries): @@ -1485,31 +1516,37 @@ def test_models_documents_restore_complex_bis(django_assert_num_queries): grand_parent = factories.DocumentFactory() parent = factories.DocumentFactory(parent=grand_parent) document = factories.DocumentFactory(parent=parent) - child1 = factories.DocumentFactory(parent=document) child2 = factories.DocumentFactory(parent=document) # Soft delete first the document document.soft_delete() + document.refresh_from_db() child1.refresh_from_db() child2.refresh_from_db() + assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at assert child1.ancestors_deleted_at == document.deleted_at assert child2.ancestors_deleted_at == document.deleted_at - # Soft delete the grand parent + # Soft delete the grand_parent grand_parent.soft_delete() + grand_parent.refresh_from_db() parent.refresh_from_db() + document.refresh_from_db() + child1.refresh_from_db() + child2.refresh_from_db() + original_parent_updated_at = parent.updated_at + original_child1_updated_at = child1.updated_at + original_child2_updated_at = child2.updated_at + assert grand_parent.deleted_at is not None assert grand_parent.ancestors_deleted_at == grand_parent.deleted_at assert parent.ancestors_deleted_at == grand_parent.deleted_at # item, child1 and child2 should not be affected - document.refresh_from_db() - child1.refresh_from_db() - child2.refresh_from_db() assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at assert child1.ancestors_deleted_at == document.deleted_at @@ -1525,14 +1562,20 @@ def test_models_documents_restore_complex_bis(django_assert_num_queries): document.refresh_from_db() child1.refresh_from_db() child2.refresh_from_db() + assert grand_parent.deleted_at is None assert grand_parent.ancestors_deleted_at is None assert parent.deleted_at is None assert parent.ancestors_deleted_at is None + # parent should have updated_at updated (descendant of restored grand_parent) + assert parent.updated_at > original_parent_updated_at assert document.deleted_at is not None assert document.ancestors_deleted_at == document.deleted_at + # children are not restored and then there updated_at should not be affected assert child1.ancestors_deleted_at == document.deleted_at + assert child1.updated_at == original_child1_updated_at assert child2.ancestors_deleted_at == document.deleted_at + assert child2.updated_at == original_child2_updated_at @pytest.mark.parametrize( @@ -1691,3 +1734,82 @@ def test_models_documents_compute_ancestors_links_paths_mapping_structure( {"link_reach": sibling.link_reach, "link_role": sibling.link_role}, ], } + + +def test_models_documents_manager_time_filter_no_filters(): + """Test time_filter with no filters returns all documents.""" + factories.DocumentFactory.create_batch(3) + + queryset = models.Document.objects.filter_updated_at() + + assert queryset.count() == 3 + + +def test_models_documents_manager_time_filter_oldest_updated_at(): + """ + Test filtering by oldest_updated_at includes documents updated after or at + lower_time_bound. + """ + lower_time_bound = datetime(2024, 2, 1, tzinfo=base_timezone.utc) + + DocumentFactory(updated_at=lower_time_bound - timedelta(days=30)) + document_at_boundary = DocumentFactory(updated_at=lower_time_bound) + document_recent = DocumentFactory(updated_at=lower_time_bound + timedelta(days=15)) + + queryset = models.Document.objects.filter_updated_at( + lower_time_bound=lower_time_bound + ) + + assert queryset.count() == 2 + assert sorted(queryset.values_list("pk", flat=True)) == sorted( + [document_at_boundary.pk, document_recent.pk] + ) + + +def test_models_documents_manager_time_filter_newest_updated_at(): + """Test filtering by newest_updated_at includes documents updated before timestamp.""" + upper_time_bound = datetime(2024, 2, 1, tzinfo=base_timezone.utc) + + document_old = DocumentFactory(updated_at=upper_time_bound - timedelta(days=30)) + document_at_boundary = DocumentFactory(updated_at=upper_time_bound) + document_too_recent = DocumentFactory( + updated_at=upper_time_bound + timedelta(days=15) + ) + + queryset = models.Document.objects.filter_updated_at( + upper_time_bound=upper_time_bound + ) + + assert queryset.count() == 2 + assert document_old in queryset + assert document_at_boundary in queryset + assert document_too_recent not in queryset + + +def test_models_documents_manager_time_filter_both_bounds(): + """Test filtering with both oldest and newest bounds.""" + lower_time_bound = datetime(2024, 2, 1, tzinfo=base_timezone.utc) + upper_time_bound = lower_time_bound + timedelta(days=30) + + document_too_early = DocumentFactory( + updated_at=lower_time_bound - timedelta(days=10) + ) + document_in_window = DocumentFactory( + updated_at=lower_time_bound + timedelta(days=5) + ) + other_document_in_window = DocumentFactory( + updated_at=lower_time_bound + timedelta(days=15) + ) + document_too_late = DocumentFactory( + updated_at=upper_time_bound + timedelta(days=10) + ) + + queryset = models.Document.objects.filter_updated_at( + lower_time_bound=lower_time_bound, upper_time_bound=upper_time_bound + ) + + assert queryset.count() == 2 + assert document_too_early not in queryset + assert document_in_window in queryset + assert other_document_in_window in queryset + assert document_too_late not in queryset diff --git a/src/backend/core/tests/test_services_search_indexers.py b/src/backend/core/tests/test_services_search_indexers.py index 01131e3999..c758b09d2b 100644 --- a/src/backend/core/tests/test_services_search_indexers.py +++ b/src/backend/core/tests/test_services_search_indexers.py @@ -252,7 +252,7 @@ def test_services_search_indexers_index_errors(indexer_settings): ) with pytest.raises(HTTPError): - FindDocumentIndexer().index() + FindDocumentIndexer().index(models.Document.objects.all()) @patch.object(FindDocumentIndexer, "push") @@ -272,7 +272,7 @@ def test_services_search_indexers_batches_pass_only_batch_accesses( access = factories.UserDocumentAccessFactory(document=document) expected_user_subs[str(document.id)] = str(access.user.sub) - assert FindDocumentIndexer().index() == 5 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 5 # Should be 3 batches: 2 + 2 + 1 assert mock_push.call_count == 3 @@ -310,7 +310,7 @@ def test_services_search_indexers_batch_size_argument(mock_push): access = factories.UserDocumentAccessFactory(document=document) expected_user_subs[str(document.id)] = str(access.user.sub) - assert FindDocumentIndexer().index(batch_size=2) == 5 + assert FindDocumentIndexer().index(models.Document.objects.all(), batch_size=2) == 5 # Should be 3 batches: 2 + 2 + 1 assert mock_push.call_count == 3 @@ -345,7 +345,7 @@ def test_services_search_indexers_ignore_empty_documents(mock_push): empty_title = factories.DocumentFactory(title="") empty_content = factories.DocumentFactory(content="") - assert FindDocumentIndexer().index() == 3 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 3 assert mock_push.call_count == 1 @@ -373,7 +373,7 @@ def test_services_search_indexers_skip_empty_batches(mock_push, indexer_settings # Only empty docs factories.DocumentFactory.create_batch(5, content="", title="") - assert FindDocumentIndexer().index() == 1 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 1 assert mock_push.call_count == 1 results = [doc["id"] for doc in mock_push.call_args[0][0]] @@ -391,7 +391,7 @@ def test_services_search_indexers_ancestors_link_reach(mock_push): parent = factories.DocumentFactory(parent=grand_parent, link_reach="public") document = factories.DocumentFactory(parent=parent, link_reach="restricted") - assert FindDocumentIndexer().index() == 4 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 4 results = {doc["id"]: doc for doc in mock_push.call_args[0][0]} assert len(results) == 4 @@ -411,7 +411,7 @@ def test_services_search_indexers_ancestors_users(mock_push): parent = factories.DocumentFactory(parent=grand_parent, users=[user_p]) document = factories.DocumentFactory(parent=parent, users=[user_d]) - assert FindDocumentIndexer().index() == 3 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 3 results = {doc["id"]: doc for doc in mock_push.call_args[0][0]} assert len(results) == 3 @@ -432,7 +432,7 @@ def test_services_search_indexers_ancestors_teams(mock_push): parent = factories.DocumentFactory(parent=grand_parent, teams=["team_p"]) document = factories.DocumentFactory(parent=parent, teams=["team_d"]) - assert FindDocumentIndexer().index() == 3 + assert FindDocumentIndexer().index(models.Document.objects.all()) == 3 results = {doc["id"]: doc for doc in mock_push.call_args[0][0]} assert len(results) == 3 diff --git a/src/backend/impress/urls.py b/src/backend/impress/urls.py index 2c5964d426..bc19876748 100644 --- a/src/backend/impress/urls.py +++ b/src/backend/impress/urls.py @@ -12,7 +12,10 @@ SpectacularSwaggerView, ) +from core.admin import run_indexing_view + urlpatterns = [ + path("admin/run-indexing/", run_indexing_view, name="run_indexing"), path("admin/", admin.site.urls), path("", include("core.urls")), ]