Перейти к содержанию

Партионная обработка

Модули batch_service.py, batch_worker.py и workload_manager.py реализуют пакетную обработку документов через Anthropic Batch API. Пакетный режим позволяет отправлять запросы к языковой модели Claude асинхронно и получать результаты с задержкой от 5 до 60 минут, но со скидкой 50 % на стоимость токенов.

Зачем нужна партионная обработка

В синхронном режиме каждый запрос к Claude выполняется немедленно: серверная часть отправляет изображения страниц и ждёт ответа. Это удобно для единичных файлов, но при массовой загрузке (десятки и сотни файлов) синхронный режим обходится вдвое дороже.

Пакетный режим объединяет несколько запросов в одну партию (batch), отправляет её через специальную точку Anthropic Batch API и периодически проверяет готовность. Экономия достигается за счёт того, что Anthropic обрабатывает пакетные запросы в менее загруженное время. Гарантированное время выполнения --- до 24 часов, на практике результаты обычно приходят за 5--60 минут.

Режим работы определяется настройкой PROCESSING_MODE:

  • sync --- немедленные запросы к Claude, результат сразу;
  • batch --- очередь с пакетной отправкой, результат через 5--60 минут.

Формирование партии запросов

Подготовка одного задания

Модуль batch_service.py (класс BatchService) превращает каждое задание (job) в набор запросов к Claude:

  1. Из хранилища MinIO скачивается исходный файл.
  2. Страницы файла преобразуются в изображения с помощью PDFProcessor.
  3. Если включён сервис Google Cloud Vision, для каждой страницы выполняется оптическое распознавание текста и определение поворота. Результаты распознавания сохраняются в базу данных сразу после получения, чтобы при повторной обработке не обращаться к сервису распознавания заново.
  4. Страницы разбиваются на группы по 8 штук (параметр batch_size в классе). Каждая группа становится одним запросом к Claude.
  5. Для каждого запроса формируется содержимое: изображения страниц в формате base64 и текстовая подсказка с пятью задачами (сегментация, классификация, извлечение реквизитов из содержимого, разбор имени файла, поиск рукописных номеров).

Каждому запросу присваивается идентификатор вида job_{id задания}_batch_{номер группы}, по которому позже результаты связываются обратно с заданием.

Параллельная подготовка нескольких заданий

Метод submit_multiple_jobs обрабатывает несколько заданий одновременно. Параллелизм ограничен семафором с числом разрешений, равным переменной окружения OCR_WORKERS. Это контролирует нагрузку на процессор во время преобразования страниц и оптического распознавания.

Автоматическое разбиение на несколько партий

У Anthropic Batch API есть ограничения:

  • максимальный размер партии --- 256 МБ (в программе используется запас: 200 МБ);
  • максимальное количество запросов в одной партии --- 100 000 (в программе --- 10 000 для запаса).

Если суммарный объём запросов превышает эти пределы, программа автоматически разбивает их на несколько партий. Каждая партия отправляется отдельным вызовом к Anthropic Batch API и получает свой идентификатор.

Отправка партии

Отправка выполняется через библиотеку anthropic (метод client.messages.batches.create). Вызов оборачивается в run_in_executor, чтобы не блокировать цикл обработки событий. После успешной отправки каждое задание получает статус processing, идентификатор партии и время отправки.

Единый исполнитель

Модуль batch_worker.py содержит единый фоновый исполнитель (unified_batch_worker), который работает в бесконечном цикле и чередует две фазы.

Фаза 1: отправка

Исполнитель ищет в базе данных задания со статусом pending. Если такие задания есть (но не более BATCH_SIZE за один раз), он:

  1. Параллельно скачивает исходные файлы из хранилища MinIO.
  2. Передаёт файлы в BatchService.submit_multiple_jobs для подготовки и отправки партии.
  3. Обновляет статус заданий в базе данных: успешные получают processing, неудачные --- failed.

При критической ошибке сервиса Google Cloud Vision (отключён биллинг) все задания помечаются как неудачные и не возвращаются в очередь, чтобы избежать бесконечных повторных попыток.

Фаза 2: проверка и обработка результатов

Исполнитель находит все уникальные идентификаторы партий со статусом processing и для каждой:

  1. Запрашивает текущее состояние через poll_batch_status (метод client.messages.batches.retrieve).
  2. Обновляет индикатор прогресса: процент выполненных запросов записывается в каждое связанное задание.
  3. Если партия завершилась (статус ended), загружает результаты через get_batch_results (метод client.messages.batches.results).
  4. Группирует результаты по идентификатору задания (по полю custom_id).
  5. Запускает параллельную обработку результатов каждого задания.

Интервал проверки

После каждого полного цикла (отправка + проверка) исполнитель ждёт POLL_INTERVAL секунд (по умолчанию 10). Это значение настраивается через переменную окружения.

Приостановка

Исполнитель проверяет флаг batch_worker_paused в коллекции settings базы данных. Если флаг установлен, цикл пропускает обе фазы и просто ждёт следующего интервала. Это позволяет приостановить обработку через интерфейс программы.

Ограничение по памяти

При обработке результатов каждое задание требует загрузки исходного файла в оперативную память (для нарезки страниц и создания миниатюр). Чтобы избежать исчерпания памяти, используется двойной механизм ограничения:

  1. Семафор на количество заданий. Одновременно обрабатывается не более RESULT_WORKERS заданий (по умолчанию определяется конфигурацией).
  2. Ограничение по суммарному объёму файлов. Программа отслеживает суммарный размер файлов, находящихся в обработке. Если добавление очередного файла превысит порог MAX_CONCURRENT_MB, задание ждёт, пока предыдущие не завершатся и не освободят место. Исключение: если ни одно задание не обрабатывается, любой файл принимается независимо от размера, чтобы избежать взаимной блокировки.

Дополнительно, при создании документов из результатов распознавания применяется адаптивный семафор:

  • файлы более 50 МБ обрабатываются по одному;
  • файлы от 20 до 50 МБ --- до двух одновременно;
  • файлы менее 20 МБ --- до четырёх одновременно.

Восстановление после перезапуска

При запуске серверной части вызывается функция recover_pending_batches. Она находит все задания с batch_status="processing" и добавляет их идентификаторы партий в набор активных. Единый исполнитель при следующей итерации обнаружит эти партии на фазе проверки и продолжит отслеживание.

Управление нагрузкой при импорте

Модуль workload_manager.py решает отдельную задачу: динамически регулирует число потоков при импорте файлов с Яндекс.Диска в зависимости от текущей загрузки системы.

Принцип работы

Каждая ресурсоёмкая задача при запуске регистрирует себя в Redis (ключ вида workload:task:{тип}:{идентификатор}) со временем жизни 70 минут. При завершении запись удаляется. Типы задач и их «стоимость»:

Тип задачи Стоимость
Обработка задания (job_processing) 2
Поиск по реестру (registry_search) 4
Выгрузка реестра (registry_export) 3

Общая ёмкость системы --- 12 условных единиц. При запросе на импорт программа подсчитывает суммарную стоимость активных задач и вычисляет доступный остаток:

доступно = 12 − сумма(количество_активных × стоимость)
потоков_для_PDF  = от 2 до 10, в зависимости от доступного остатка
потоков_для_TIFF = от 1 до 3, равно доступный_остаток ÷ 4

Если Redis недоступен, используются значения по умолчанию: 10 потоков для файлов и 2 для изображений.

Контекстный менеджер

Для удобства предусмотрен контекстный менеджер track_task, который автоматически регистрирует задачу при входе и удаляет при выходе:

with track_task("registry_search", task_id):
    # выполнение задачи

Настройки масштабирования

Все параметры масштабирования задаются через переменные окружения и дублируются в docker-compose.yml:

Переменная Значение по умолчанию Назначение
OCR_WORKERS 1 Число параллельных преобразований страниц при подготовке партии
BATCH_SIZE 30 Максимальное количество заданий, забираемых из очереди за одну итерацию
POLL_INTERVAL 10 Интервал проверки готовности партий (в секундах)
RESULT_WORKERS (из конфигурации) Число параллельно обрабатываемых результатов
MAX_CONCURRENT_MB (из конфигурации) Порог суммарного объёма файлов в обработке (в мегабайтах)

Рекомендации по масштабированию приведены в разделе Конфигурация.