Перейти к содержанию

Партионная обработка

Партионная обработка --- алгоритм экономичной отправки запросов к языковой модели Claude через Anthropic Batch API. Вместо немедленных запросов программа накапливает их в партии и отправляет пакетом, получая скидку 50 % на стоимость.

Подробное описание модулей batch_service.py, batch_worker.py и workload_manager.py --- в разделе Партионная обработка.

Суть экономии

В синхронном режиме каждый запрос к Claude выполняется немедленно и оплачивается по полной стоимости. В пакетном режиме Anthropic обрабатывает запросы в менее загруженное время и снижает цену вдвое. Гарантированный срок выполнения --- до 24 часов; на практике результаты приходят за 5--60 минут.

Режим определяется настройкой PROCESSING_MODE:

  • sync --- немедленные запросы, результат сразу;
  • batch --- очередь с пакетной отправкой, результат через 5--60 минут.

Жизненный цикл партии

Формирование запросов

Для каждого задания обработки модуль BatchService выполняет:

  1. Скачивает исходный файл из хранилища MinIO.
  2. Преобразует страницы в изображения.
  3. Если включён Google Cloud Vision, выполняет оптическое распознавание и определение поворота. Результаты сохраняются в базу данных сразу, чтобы при повторной обработке не обращаться к сервису снова.
  4. Разбивает страницы на группы по 8 штук. Каждая группа становится одним запросом к Claude.
  5. Присваивает каждому запросу идентификатор вида job_{id}_batch_{номер}.

Ограничения Anthropic Batch API

Ограничение Лимит от Anthropic Лимит в программе
Размер партии 256 МБ 200 МБ (запас)
Число запросов 100 000 10 000 (запас)

Если запросы не помещаются в одну партию, программа автоматически разбивает их на несколько. Каждая отправляется отдельно и получает свой идентификатор.

Отправка

Отправка выполняется через библиотеку anthropic в отдельном потоке (run_in_executor), чтобы не блокировать цикл обработки событий. После отправки каждое задание получает статус processing, идентификатор партии и время отправки.

Проверка готовности

Единый исполнитель (unified_batch_worker) периодически запрашивает состояние всех активных партий. Интервал проверки задаётся переменной POLL_INTERVAL (по умолчанию 10 секунд). При каждой проверке обновляется индикатор прогресса: процент выполненных запросов записывается в каждое связанное задание.

Получение и обработка результатов

Когда партия завершена (статус ended):

  1. Загружаются результаты всех запросов.
  2. Результаты группируются по идентификатору задания (по полю custom_id).
  3. Для каждого задания параллельно выполняется обработка: извлечение данных из ответов Claude, создание документов, нарезка страниц, создание миниатюр, загрузка в MinIO, автоматическая группировка.

Единый исполнитель

Модуль batch_worker.py содержит фоновый исполнитель, который работает в бесконечном цикле и чередует две фазы:

Фаза 1 --- отправка. Исполнитель ищет задания со статусом pending. Если такие есть (не более BATCH_SIZE за раз), он скачивает файлы, подготавливает запросы и отправляет партию.

Фаза 2 --- проверка. Исполнитель находит все активные партии и проверяет их состояние. Завершённые партии обрабатываются.

Чередование фаз в едином процессе (а не в двух отдельных) предотвращает ситуацию, когда отправка и обработка конкурируют за ресурсы: пока обрабатываются результаты одной партии, новые задания не отправляются, и наоборот.

Приостановка

Исполнитель проверяет флаг batch_worker_paused в настройках. Если флаг установлен, обе фазы пропускаются. Это позволяет приостановить обработку через интерфейс.

Восстановление после перезапуска

При запуске серверной части функция recover_pending_batches находит задания с batch_status="processing" и возобновляет отслеживание их партий. Результаты, пришедшие во время простоя, будут обработаны при следующей итерации.

Ограничение по памяти

Обработка результатов требует загрузки исходных файлов в оперативную память. Чтобы избежать исчерпания, используется двойной механизм:

Семафор на количество заданий

Одновременно обрабатывается не более RESULT_WORKERS заданий.

Ограничение по объёму файлов

Программа отслеживает суммарный размер файлов в обработке. Если добавление очередного файла превысит порог MAX_CONCURRENT_MB, задание ждёт освобождения. Исключение: если ни одно задание не обрабатывается, любой файл принимается, чтобы избежать взаимной блокировки.

Адаптивный семафор для документов

При создании документов из результатов распознавания:

Размер файла Одновременно
Более 50 МБ 1
20--50 МБ 2
Менее 20 МБ 4

Управление нагрузкой при импорте

Модуль workload_manager.py динамически регулирует число потоков при импорте файлов с Яндекс.Диска. Каждая ресурсоёмкая задача регистрируется в Redis со своей «стоимостью»:

Тип задачи Стоимость
Обработка задания 2
Поиск по реестру 4
Выгрузка реестра 3

Общая ёмкость --- 12 условных единиц. Доступный остаток определяет число потоков:

доступно = 12 − сумма(стоимость активных задач)
потоков для файлов = от 2 до 10
потоков для изображений = от 1 до 3

Записи автоматически истекают через 70 минут (на 5 минут больше максимального времени задачи), чтобы при аварийном завершении задачи ресурсы освобождались.

Настройки масштабирования

Переменная По умолчанию Назначение
OCR_WORKERS 1 Число параллельных преобразований страниц
BATCH_SIZE 30 Заданий за одну итерацию отправки
POLL_INTERVAL 10 Интервал проверки готовности (секунды)
RESULT_WORKERS (из конфигурации) Параллельно обрабатываемых результатов
MAX_CONCURRENT_MB (из конфигурации) Порог суммарного объёма файлов (МБ)

Рекомендации по выбору значений --- в разделе Конфигурация.