Фоновые задачи Celery¶
Модуль tasks.py содержит фоновые задачи, выполняемые обработчиками Celery. Задачи работают в отдельных процессах и используют синхронные подключения к MongoDB и MinIO. Это отличает их от основной серверной части, которая работает асинхронно через motor и aiobotocore.
Архитектура исполнения¶
Фоновые задачи запускаются из маршрутов серверной части через вызов .delay() или .apply_async(). Очередь задач хранится в Redis (настройка CELERY_BROKER_URL). Один или несколько процессов-обработчиков (celery worker) забирают задачи из очереди и выполняют их.
Каждая задача:
- создаёт собственное синхронное подключение к MongoDB через
MongoClient(функция_get_mongo_client); - при необходимости создаёт подключение к MinIO через
Minio(функция_get_minio_client); - сообщает о прогрессе через
self.update_state(state='PROGRESS', meta={...})--- серверная часть опрашивает этот прогресс и передаёт его пользовательскому интерфейсу; - при ошибке обновляет статус реестра в базе на
errorи сохраняет текст ошибки.
Задача импорта строк реестра¶
Задача import_registry_rows_task выполняет те же действия, что и класс RegistryImporter из модуля registry_import.py (см. Импорт и поиск по реестру), но в синхронном режиме внутри процесса Celery.
Последовательность действий¶
- Проверка статуса реестра: задача выполняется только если статус реестра ---
importing. Если статус другой, задача завершается без действий. - Загрузка файла Excel из MinIO.
- Очистка файла: из архива Excel удаляются теги
autoFilter, которые в некоторых файлах вызывают ошибки при открытии черезopenpyxl. - Разбор листа Excel с учётом настроек:
header_row--- номер строки заголовков;data_start_row--- номер первой строки данных;fill_down_columns--- колонки для заполнения пропусков сверху вниз;column_header_row--- словарь индивидуальных заголовочных строк для отдельных колонок (полезно, когда у разных колонок разное начало данных).
- Применение заполнения пропусков сверху вниз (
_apply_fill_down). - Проверка: если у реестра уже есть обработанные строки (со статусом отличным от
pending) и параметрforceне установлен, импорт пропускается, чтобы не затереть результаты поиска. - Удаление старых строк.
- Фильтрация строк по правилам:
- пропуск строк, содержащих ключевые слова из списка
skip_rows_containing(например, «ИТОГО», «Всего»); - пропуск строк с пустым значением в обязательной колонке (
required_column_index); - пропуск строк с количеством заполненных ячеек менее
min_filled_cells.
- пропуск строк, содержащих ключевые слова из списка
- Формирование трёх представлений каждой строки (
raw_data,mapped_data,normalized) и вставка в базу порциями по 500. - Обновление статистики реестра и перевод статуса в
configured.
Отфильтрованные строки (до 50 штук) сохраняются в поле filtered_rows_data реестра --- это позволяет пользователю увидеть, какие строки были пропущены, и при необходимости скорректировать настройки фильтрации.
Задача поиска документов по реестру¶
Задача search_registry_task выполняет поиск документов для всех строк реестра. Задача имеет ограничение по времени: 60 минут (мягкое) и 65 минут (жёсткое, после которого процесс завершается принудительно).
Ограничение параллельности¶
При запуске задача регистрируется через register_task("registry_search", registry_id), при завершении --- через unregister_task. Модуль workload_manager.py ведёт учёт активных задач, что позволяет серверной части контролировать нагрузку (например, запрещать одновременный поиск по нескольким реестрам).
Последовательность действий¶
- Проверка статуса реестра: допустимы статусы
searching,configured,completed. - Загрузка активных правил сопоставления. Если правил нет, но есть поля старого формата --- выполняется миграция на лету (старые поля оборачиваются в одно правило).
- Сбор путей к полям документов из всех правил (включая подправила).
- Загрузка документов из коллекции
documents(все, у которых хотя бы одно нужное поле заполнено). - Построение прямого и обратного указателей документов (аналогично
RegistrySearcher, см. Импорт и поиск по реестру). - Обработка строк реестра по одной. Для каждой строки:
- поиск кандидатов через обратный указатель (фаза 1);
- подсчёт баллов по кандидатам (фаза 2);
- объединение результатов нескольких правил;
- формирование предупреждений по суммам.
- Запись результатов порциями по 200 через
bulk_write. - Обновление статистики реестра и перевод статуса в
completed.
При обработке полей с данными из содержимого документа и из имени файла применяется приоритет: если у документа есть данные из содержимого (from_content.*), поля из имени файла (from_filename.*) игнорируются.
Отчёт о прогрессе¶
Каждые 100 строк задача обновляет прогресс через self.update_state. Серверная часть опрашивает Celery через AsyncResult(task_id) и передаёт текущий прогресс пользовательскому интерфейсу. Прогресс содержит:
progress--- процент выполнения (от 0 до 100);message--- текстовое описание текущего действия.
Обработка ошибок¶
При любой ошибке задача:
- Записывает ошибку в журнал с полным стеком вызовов.
- Обновляет статус реестра в базе на
error. - Сохраняет текст ошибки в поле
task_errorреестра. - Повторно выбрасывает исключение, чтобы Celery пометил задачу как неуспешную.
Задача поиска дополнительно снимает регистрацию в workload_manager в блоке finally, гарантируя освобождение ресурса даже при ошибке.
Старая задача поиска¶
Задача search_documents_task --- устаревший вариант поиска, работающий по одному столбцу (column_index) и одному набору маппингов (mapping_id). Она сохранена для обратной совместимости с ранними версиями программы. В новой архитектуре мастера реестра используется search_registry_task, которая работает сразу со всеми правилами.
Основные отличия от нового поиска:
- работает по одной колонке, а не по всем правилам сразу;
- результаты сохраняются в отдельную коллекцию
registry_match_results; - не поддерживает мультиправила и подправила.
Вспомогательные функции¶
| Функция | Назначение |
|---|---|
_get_mongo_client() |
Создаёт синхронное подключение к MongoDB по переменным окружения |
_get_minio_client() |
Создаёт подключение к MinIO по переменным окружения |
_download_file_sync() |
Скачивает файл из MinIO синхронно |
_get_sheet_data_sync() |
Парсит лист Excel и возвращает массив строк |
_apply_fill_down() |
Заполняет пустые ячейки значением последней непустой ячейки выше |
_normalize_text() |
Приведение текста к нижнему регистру, удаление знаков препинания |
_normalize_number() |
Извлечение цифр из строки |
_normalize_date() |
Приведение даты к формату ГГГГ-ММ-ДД |
levenshtein_similarity() |
Сходство строк на основе расстояния Левенштейна (от 0 до 1) |
_build_document_index_for_search() |
Построение прямого указателя документов |
_build_reverse_index() |
Построение обратного указателя для быстрого поиска кандидатов |
_find_candidates() |
Поиск документов-кандидатов через обратный указатель |
_compare_field_values() |
Сравнение значений полей (аналог _compare_values из RegistrySearcher) |