Партионная обработка¶
Партионная обработка --- алгоритм экономичной отправки запросов к языковой модели Claude через Anthropic Batch API. Вместо немедленных запросов программа накапливает их в партии и отправляет пакетом, получая скидку 50 % на стоимость.
Подробное описание модулей batch_service.py, batch_worker.py и workload_manager.py --- в разделе Партионная обработка.
Суть экономии¶
В синхронном режиме каждый запрос к Claude выполняется немедленно и оплачивается по полной стоимости. В пакетном режиме Anthropic обрабатывает запросы в менее загруженное время и снижает цену вдвое. Гарантированный срок выполнения --- до 24 часов; на практике результаты приходят за 5--60 минут.
Режим определяется настройкой PROCESSING_MODE:
sync--- немедленные запросы, результат сразу;batch--- очередь с пакетной отправкой, результат через 5--60 минут.
Жизненный цикл партии¶
Формирование запросов¶
Для каждого задания обработки модуль BatchService выполняет:
- Скачивает исходный файл из хранилища MinIO.
- Преобразует страницы в изображения.
- Если включён Google Cloud Vision, выполняет оптическое распознавание и определение поворота. Результаты сохраняются в базу данных сразу, чтобы при повторной обработке не обращаться к сервису снова.
- Разбивает страницы на группы по 8 штук. Каждая группа становится одним запросом к Claude.
- Присваивает каждому запросу идентификатор вида
job_{id}_batch_{номер}.
Ограничения Anthropic Batch API¶
| Ограничение | Лимит от Anthropic | Лимит в программе |
|---|---|---|
| Размер партии | 256 МБ | 200 МБ (запас) |
| Число запросов | 100 000 | 10 000 (запас) |
Если запросы не помещаются в одну партию, программа автоматически разбивает их на несколько. Каждая отправляется отдельно и получает свой идентификатор.
Отправка¶
Отправка выполняется через библиотеку anthropic в отдельном потоке (run_in_executor), чтобы не блокировать цикл обработки событий. После отправки каждое задание получает статус processing, идентификатор партии и время отправки.
Проверка готовности¶
Единый исполнитель (unified_batch_worker) периодически запрашивает состояние всех активных партий. Интервал проверки задаётся переменной POLL_INTERVAL (по умолчанию 10 секунд). При каждой проверке обновляется индикатор прогресса: процент выполненных запросов записывается в каждое связанное задание.
Получение и обработка результатов¶
Когда партия завершена (статус ended):
- Загружаются результаты всех запросов.
- Результаты группируются по идентификатору задания (по полю
custom_id). - Для каждого задания параллельно выполняется обработка: извлечение данных из ответов Claude, создание документов, нарезка страниц, создание миниатюр, загрузка в MinIO, автоматическая группировка.
Единый исполнитель¶
Модуль batch_worker.py содержит фоновый исполнитель, который работает в бесконечном цикле и чередует две фазы:
Фаза 1 --- отправка. Исполнитель ищет задания со статусом pending. Если такие есть (не более BATCH_SIZE за раз), он скачивает файлы, подготавливает запросы и отправляет партию.
Фаза 2 --- проверка. Исполнитель находит все активные партии и проверяет их состояние. Завершённые партии обрабатываются.
Чередование фаз в едином процессе (а не в двух отдельных) предотвращает ситуацию, когда отправка и обработка конкурируют за ресурсы: пока обрабатываются результаты одной партии, новые задания не отправляются, и наоборот.
Приостановка¶
Исполнитель проверяет флаг batch_worker_paused в настройках. Если флаг установлен, обе фазы пропускаются. Это позволяет приостановить обработку через интерфейс.
Восстановление после перезапуска¶
При запуске серверной части функция recover_pending_batches находит задания с batch_status="processing" и возобновляет отслеживание их партий. Результаты, пришедшие во время простоя, будут обработаны при следующей итерации.
Ограничение по памяти¶
Обработка результатов требует загрузки исходных файлов в оперативную память. Чтобы избежать исчерпания, используется двойной механизм:
Семафор на количество заданий¶
Одновременно обрабатывается не более RESULT_WORKERS заданий.
Ограничение по объёму файлов¶
Программа отслеживает суммарный размер файлов в обработке. Если добавление очередного файла превысит порог MAX_CONCURRENT_MB, задание ждёт освобождения. Исключение: если ни одно задание не обрабатывается, любой файл принимается, чтобы избежать взаимной блокировки.
Адаптивный семафор для документов¶
При создании документов из результатов распознавания:
| Размер файла | Одновременно |
|---|---|
| Более 50 МБ | 1 |
| 20--50 МБ | 2 |
| Менее 20 МБ | 4 |
Управление нагрузкой при импорте¶
Модуль workload_manager.py динамически регулирует число потоков при импорте файлов с Яндекс.Диска. Каждая ресурсоёмкая задача регистрируется в Redis со своей «стоимостью»:
| Тип задачи | Стоимость |
|---|---|
| Обработка задания | 2 |
| Поиск по реестру | 4 |
| Выгрузка реестра | 3 |
Общая ёмкость --- 12 условных единиц. Доступный остаток определяет число потоков:
доступно = 12 − сумма(стоимость активных задач)
потоков для файлов = от 2 до 10
потоков для изображений = от 1 до 3
Записи автоматически истекают через 70 минут (на 5 минут больше максимального времени задачи), чтобы при аварийном завершении задачи ресурсы освобождались.
Настройки масштабирования¶
| Переменная | По умолчанию | Назначение |
|---|---|---|
OCR_WORKERS |
1 | Число параллельных преобразований страниц |
BATCH_SIZE |
30 | Заданий за одну итерацию отправки |
POLL_INTERVAL |
10 | Интервал проверки готовности (секунды) |
RESULT_WORKERS |
(из конфигурации) | Параллельно обрабатываемых результатов |
MAX_CONCURRENT_MB |
(из конфигурации) | Порог суммарного объёма файлов (МБ) |
Рекомендации по выбору значений --- в разделе Конфигурация.