Перейти к содержанию

Фоновые задачи Celery

Модуль tasks.py содержит фоновые задачи, выполняемые обработчиками Celery. Задачи работают в отдельных процессах и используют синхронные подключения к MongoDB и MinIO. Это отличает их от основной серверной части, которая работает асинхронно через motor и aiobotocore.

Архитектура исполнения

Фоновые задачи запускаются из маршрутов серверной части через вызов .delay() или .apply_async(). Очередь задач хранится в Redis (настройка CELERY_BROKER_URL). Один или несколько процессов-обработчиков (celery worker) забирают задачи из очереди и выполняют их.

Каждая задача:

  • создаёт собственное синхронное подключение к MongoDB через MongoClient (функция _get_mongo_client);
  • при необходимости создаёт подключение к MinIO через Minio (функция _get_minio_client);
  • сообщает о прогрессе через self.update_state(state='PROGRESS', meta={...}) --- серверная часть опрашивает этот прогресс и передаёт его пользовательскому интерфейсу;
  • при ошибке обновляет статус реестра в базе на error и сохраняет текст ошибки.

Задача импорта строк реестра

Задача import_registry_rows_task выполняет те же действия, что и класс RegistryImporter из модуля registry_import.py (см. Импорт и поиск по реестру), но в синхронном режиме внутри процесса Celery.

Последовательность действий

  1. Проверка статуса реестра: задача выполняется только если статус реестра --- importing. Если статус другой, задача завершается без действий.
  2. Загрузка файла Excel из MinIO.
  3. Очистка файла: из архива Excel удаляются теги autoFilter, которые в некоторых файлах вызывают ошибки при открытии через openpyxl.
  4. Разбор листа Excel с учётом настроек:
    • header_row --- номер строки заголовков;
    • data_start_row --- номер первой строки данных;
    • fill_down_columns --- колонки для заполнения пропусков сверху вниз;
    • column_header_row --- словарь индивидуальных заголовочных строк для отдельных колонок (полезно, когда у разных колонок разное начало данных).
  5. Применение заполнения пропусков сверху вниз (_apply_fill_down).
  6. Проверка: если у реестра уже есть обработанные строки (со статусом отличным от pending) и параметр force не установлен, импорт пропускается, чтобы не затереть результаты поиска.
  7. Удаление старых строк.
  8. Фильтрация строк по правилам:
    • пропуск строк, содержащих ключевые слова из списка skip_rows_containing (например, «ИТОГО», «Всего»);
    • пропуск строк с пустым значением в обязательной колонке (required_column_index);
    • пропуск строк с количеством заполненных ячеек менее min_filled_cells.
  9. Формирование трёх представлений каждой строки (raw_data, mapped_data, normalized) и вставка в базу порциями по 500.
  10. Обновление статистики реестра и перевод статуса в configured.

Отфильтрованные строки (до 50 штук) сохраняются в поле filtered_rows_data реестра --- это позволяет пользователю увидеть, какие строки были пропущены, и при необходимости скорректировать настройки фильтрации.

Задача поиска документов по реестру

Задача search_registry_task выполняет поиск документов для всех строк реестра. Задача имеет ограничение по времени: 60 минут (мягкое) и 65 минут (жёсткое, после которого процесс завершается принудительно).

Ограничение параллельности

При запуске задача регистрируется через register_task("registry_search", registry_id), при завершении --- через unregister_task. Модуль workload_manager.py ведёт учёт активных задач, что позволяет серверной части контролировать нагрузку (например, запрещать одновременный поиск по нескольким реестрам).

Последовательность действий

  1. Проверка статуса реестра: допустимы статусы searching, configured, completed.
  2. Загрузка активных правил сопоставления. Если правил нет, но есть поля старого формата --- выполняется миграция на лету (старые поля оборачиваются в одно правило).
  3. Сбор путей к полям документов из всех правил (включая подправила).
  4. Загрузка документов из коллекции documents (все, у которых хотя бы одно нужное поле заполнено).
  5. Построение прямого и обратного указателей документов (аналогично RegistrySearcher, см. Импорт и поиск по реестру).
  6. Обработка строк реестра по одной. Для каждой строки:
    • поиск кандидатов через обратный указатель (фаза 1);
    • подсчёт баллов по кандидатам (фаза 2);
    • объединение результатов нескольких правил;
    • формирование предупреждений по суммам.
  7. Запись результатов порциями по 200 через bulk_write.
  8. Обновление статистики реестра и перевод статуса в completed.

При обработке полей с данными из содержимого документа и из имени файла применяется приоритет: если у документа есть данные из содержимого (from_content.*), поля из имени файла (from_filename.*) игнорируются.

Отчёт о прогрессе

Каждые 100 строк задача обновляет прогресс через self.update_state. Серверная часть опрашивает Celery через AsyncResult(task_id) и передаёт текущий прогресс пользовательскому интерфейсу. Прогресс содержит:

  • progress --- процент выполнения (от 0 до 100);
  • message --- текстовое описание текущего действия.

Обработка ошибок

При любой ошибке задача:

  1. Записывает ошибку в журнал с полным стеком вызовов.
  2. Обновляет статус реестра в базе на error.
  3. Сохраняет текст ошибки в поле task_error реестра.
  4. Повторно выбрасывает исключение, чтобы Celery пометил задачу как неуспешную.

Задача поиска дополнительно снимает регистрацию в workload_manager в блоке finally, гарантируя освобождение ресурса даже при ошибке.

Старая задача поиска

Задача search_documents_task --- устаревший вариант поиска, работающий по одному столбцу (column_index) и одному набору маппингов (mapping_id). Она сохранена для обратной совместимости с ранними версиями программы. В новой архитектуре мастера реестра используется search_registry_task, которая работает сразу со всеми правилами.

Основные отличия от нового поиска:

  • работает по одной колонке, а не по всем правилам сразу;
  • результаты сохраняются в отдельную коллекцию registry_match_results;
  • не поддерживает мультиправила и подправила.

Вспомогательные функции

Функция Назначение
_get_mongo_client() Создаёт синхронное подключение к MongoDB по переменным окружения
_get_minio_client() Создаёт подключение к MinIO по переменным окружения
_download_file_sync() Скачивает файл из MinIO синхронно
_get_sheet_data_sync() Парсит лист Excel и возвращает массив строк
_apply_fill_down() Заполняет пустые ячейки значением последней непустой ячейки выше
_normalize_text() Приведение текста к нижнему регистру, удаление знаков препинания
_normalize_number() Извлечение цифр из строки
_normalize_date() Приведение даты к формату ГГГГ-ММ-ДД
levenshtein_similarity() Сходство строк на основе расстояния Левенштейна (от 0 до 1)
_build_document_index_for_search() Построение прямого указателя документов
_build_reverse_index() Построение обратного указателя для быстрого поиска кандидатов
_find_candidates() Поиск документов-кандидатов через обратный указатель
_compare_field_values() Сравнение значений полей (аналог _compare_values из RegistrySearcher)