Партионная обработка¶
Модули batch_service.py, batch_worker.py и workload_manager.py реализуют пакетную обработку документов через Anthropic Batch API. Пакетный режим позволяет отправлять запросы к языковой модели Claude асинхронно и получать результаты с задержкой от 5 до 60 минут, но со скидкой 50 % на стоимость токенов.
Зачем нужна партионная обработка¶
В синхронном режиме каждый запрос к Claude выполняется немедленно: серверная часть отправляет изображения страниц и ждёт ответа. Это удобно для единичных файлов, но при массовой загрузке (десятки и сотни файлов) синхронный режим обходится вдвое дороже.
Пакетный режим объединяет несколько запросов в одну партию (batch), отправляет её через специальную точку Anthropic Batch API и периодически проверяет готовность. Экономия достигается за счёт того, что Anthropic обрабатывает пакетные запросы в менее загруженное время. Гарантированное время выполнения --- до 24 часов, на практике результаты обычно приходят за 5--60 минут.
Режим работы определяется настройкой PROCESSING_MODE:
sync--- немедленные запросы к Claude, результат сразу;batch--- очередь с пакетной отправкой, результат через 5--60 минут.
Формирование партии запросов¶
Подготовка одного задания¶
Модуль batch_service.py (класс BatchService) превращает каждое задание (job) в набор запросов к Claude:
- Из хранилища MinIO скачивается исходный файл.
- Страницы файла преобразуются в изображения с помощью
PDFProcessor. - Если включён сервис Google Cloud Vision, для каждой страницы выполняется оптическое распознавание текста и определение поворота. Результаты распознавания сохраняются в базу данных сразу после получения, чтобы при повторной обработке не обращаться к сервису распознавания заново.
- Страницы разбиваются на группы по 8 штук (параметр
batch_sizeв классе). Каждая группа становится одним запросом к Claude. - Для каждого запроса формируется содержимое: изображения страниц в формате base64 и текстовая подсказка с пятью задачами (сегментация, классификация, извлечение реквизитов из содержимого, разбор имени файла, поиск рукописных номеров).
Каждому запросу присваивается идентификатор вида job_{id задания}_batch_{номер группы}, по которому позже результаты связываются обратно с заданием.
Параллельная подготовка нескольких заданий¶
Метод submit_multiple_jobs обрабатывает несколько заданий одновременно. Параллелизм ограничен семафором с числом разрешений, равным переменной окружения OCR_WORKERS. Это контролирует нагрузку на процессор во время преобразования страниц и оптического распознавания.
Автоматическое разбиение на несколько партий¶
У Anthropic Batch API есть ограничения:
- максимальный размер партии --- 256 МБ (в программе используется запас: 200 МБ);
- максимальное количество запросов в одной партии --- 100 000 (в программе --- 10 000 для запаса).
Если суммарный объём запросов превышает эти пределы, программа автоматически разбивает их на несколько партий. Каждая партия отправляется отдельным вызовом к Anthropic Batch API и получает свой идентификатор.
Отправка партии¶
Отправка выполняется через библиотеку anthropic (метод client.messages.batches.create). Вызов оборачивается в run_in_executor, чтобы не блокировать цикл обработки событий. После успешной отправки каждое задание получает статус processing, идентификатор партии и время отправки.
Единый исполнитель¶
Модуль batch_worker.py содержит единый фоновый исполнитель (unified_batch_worker), который работает в бесконечном цикле и чередует две фазы.
Фаза 1: отправка¶
Исполнитель ищет в базе данных задания со статусом pending. Если такие задания есть (но не более BATCH_SIZE за один раз), он:
- Параллельно скачивает исходные файлы из хранилища MinIO.
- Передаёт файлы в
BatchService.submit_multiple_jobsдля подготовки и отправки партии. - Обновляет статус заданий в базе данных: успешные получают
processing, неудачные ---failed.
При критической ошибке сервиса Google Cloud Vision (отключён биллинг) все задания помечаются как неудачные и не возвращаются в очередь, чтобы избежать бесконечных повторных попыток.
Фаза 2: проверка и обработка результатов¶
Исполнитель находит все уникальные идентификаторы партий со статусом processing и для каждой:
- Запрашивает текущее состояние через
poll_batch_status(методclient.messages.batches.retrieve). - Обновляет индикатор прогресса: процент выполненных запросов записывается в каждое связанное задание.
- Если партия завершилась (статус
ended), загружает результаты черезget_batch_results(методclient.messages.batches.results). - Группирует результаты по идентификатору задания (по полю
custom_id). - Запускает параллельную обработку результатов каждого задания.
Интервал проверки¶
После каждого полного цикла (отправка + проверка) исполнитель ждёт POLL_INTERVAL секунд (по умолчанию 10). Это значение настраивается через переменную окружения.
Приостановка¶
Исполнитель проверяет флаг batch_worker_paused в коллекции settings базы данных. Если флаг установлен, цикл пропускает обе фазы и просто ждёт следующего интервала. Это позволяет приостановить обработку через интерфейс программы.
Ограничение по памяти¶
При обработке результатов каждое задание требует загрузки исходного файла в оперативную память (для нарезки страниц и создания миниатюр). Чтобы избежать исчерпания памяти, используется двойной механизм ограничения:
- Семафор на количество заданий. Одновременно обрабатывается не более
RESULT_WORKERSзаданий (по умолчанию определяется конфигурацией). - Ограничение по суммарному объёму файлов. Программа отслеживает суммарный размер файлов, находящихся в обработке. Если добавление очередного файла превысит порог
MAX_CONCURRENT_MB, задание ждёт, пока предыдущие не завершатся и не освободят место. Исключение: если ни одно задание не обрабатывается, любой файл принимается независимо от размера, чтобы избежать взаимной блокировки.
Дополнительно, при создании документов из результатов распознавания применяется адаптивный семафор:
- файлы более 50 МБ обрабатываются по одному;
- файлы от 20 до 50 МБ --- до двух одновременно;
- файлы менее 20 МБ --- до четырёх одновременно.
Восстановление после перезапуска¶
При запуске серверной части вызывается функция recover_pending_batches. Она находит все задания с batch_status="processing" и добавляет их идентификаторы партий в набор активных. Единый исполнитель при следующей итерации обнаружит эти партии на фазе проверки и продолжит отслеживание.
Управление нагрузкой при импорте¶
Модуль workload_manager.py решает отдельную задачу: динамически регулирует число потоков при импорте файлов с Яндекс.Диска в зависимости от текущей загрузки системы.
Принцип работы¶
Каждая ресурсоёмкая задача при запуске регистрирует себя в Redis (ключ вида workload:task:{тип}:{идентификатор}) со временем жизни 70 минут. При завершении запись удаляется. Типы задач и их «стоимость»:
| Тип задачи | Стоимость |
|---|---|
Обработка задания (job_processing) |
2 |
Поиск по реестру (registry_search) |
4 |
Выгрузка реестра (registry_export) |
3 |
Общая ёмкость системы --- 12 условных единиц. При запросе на импорт программа подсчитывает суммарную стоимость активных задач и вычисляет доступный остаток:
доступно = 12 − сумма(количество_активных × стоимость)
потоков_для_PDF = от 2 до 10, в зависимости от доступного остатка
потоков_для_TIFF = от 1 до 3, равно доступный_остаток ÷ 4
Если Redis недоступен, используются значения по умолчанию: 10 потоков для файлов и 2 для изображений.
Контекстный менеджер¶
Для удобства предусмотрен контекстный менеджер track_task, который автоматически регистрирует задачу при входе и удаляет при выходе:
with track_task("registry_search", task_id):
# выполнение задачи
Настройки масштабирования¶
Все параметры масштабирования задаются через переменные окружения и дублируются в docker-compose.yml:
| Переменная | Значение по умолчанию | Назначение |
|---|---|---|
OCR_WORKERS |
1 | Число параллельных преобразований страниц при подготовке партии |
BATCH_SIZE |
30 | Максимальное количество заданий, забираемых из очереди за одну итерацию |
POLL_INTERVAL |
10 | Интервал проверки готовности партий (в секундах) |
RESULT_WORKERS |
(из конфигурации) | Число параллельно обрабатываемых результатов |
MAX_CONCURRENT_MB |
(из конфигурации) | Порог суммарного объёма файлов в обработке (в мегабайтах) |
Рекомендации по масштабированию приведены в разделе Конфигурация.