2025, Dec 20 18:03
Как запустить C++‑утилиту с лимитом параллелизма в Python
Ограничиваем параллелизм: запускаем C++‑процессы из Python через subprocess и ThreadPool. Три одновременных задания, прогресс‑бар, обработка ошибок. Примеры.
Когда нужно запускать многопоточный исполняемый файл на C++ на множестве входных данных, но при этом жестко ограничивать параллелизм, простейший пул процессов в Python аккуратно решает эту оркестрацию. Цель — поддерживать одновременно три внешних процесса, при том что каждый из них поднимает по 10 потоков, чтобы общее число потоков в системе оставалось в заданных рамках.
Базовый вариант: с чего начинаем
Отправная точка — управляющий скрипт на Python, который читает имена образцов из JSON и стремится держать в работе ровно три задачи. Эскиз ниже иллюстрирует задуманный поток управления, пока еще без рабочего планировщика.
import json
if __name__ == "__main__":
# Путь к исполняемому файлу C++, который внутри поднимает 10 потоков
TOOL_BIN = "./GenerateTrimmedNTuple"
sample_ids = json.load(open("samples.json"))
task_list = [[TOOL_BIN, s_id] for s_id in sample_ids]
# Держим активными не более трех внешних процессов одновременно
active = 0
while active < 3:
# запустить задачу и увеличить счетчик
# по завершении задачи уменьшить счетчик
pass
Что на самом деле происходит
Нужно жестко ограничить число одновременно запущенных исполняемых файлов тремя — независимо от количества образцов. Каждый из них самостоятельно создает 10 потоков, и такая оркестрация гарантирует, что суммарно вы не выйдете за предел в 30 потоков. Не хватает лишь надежного способа параллельно запускать внешние команды и получать уведомление об их завершении.
Практическое решение с subprocess и ThreadPool
Сочетание subprocess.run для запуска внешнего бинарника и multiprocessing.pool.ThreadPool для ограничения параллелизма дает ровно то, что нужно. Пул сопоставляет списки аргументов с рабочей функцией, одновременно выполняет не более трех задач и возвращает результаты по мере завершения отдельных процессов.
import multiprocessing.pool as mp_pool
import json
import subprocess
if __name__ == "__main__":
# Исполняемый файл C++: при каждом запуске создает 10 потоков
BIN_PATH = "./GenerateTrimmedNTuple"
items = json.load(open("samples.json"))
cmd_args = [[BIN_PATH, name] for name in items]
concurrency = 3
with mp_pool.ThreadPool(concurrency) as executor:
def run_job(argv):
subprocess.run(argv, check=True)
return argv
for finished_cmd in executor.imap_unordered(run_job, cmd_args):
print(f"command: {finished_cmd} finished!")
Параметр check=True заставит драйвер завершиться с ошибкой, если какой-либо внешний процесс вернется с ненулевым кодом. Если нужно продолжать при сбоях, уберите check=True и изучите другие параметры subprocess.run, например capture_output, чтобы явно управлять потоками вывода.
Необязательная индикация прогресса
Хотите прогресс-бар — оберните итератор в tqdm. В качестве total укажите количество команд к выполнению.
import tqdm
# ... в том же контексте и с теми же переменными, что и выше:
for finished_cmd in tqdm.tqdm(executor.imap_unordered(run_job, cmd_args), total=len(cmd_args)):
print(f"command: {finished_cmd} finished!")
Почему этот подход важен
Такая схема гарантирует, что одновременно работают не более трех исполняемых файлов, а значит, с учетом 10 потоков на процесс общий счетчик остается в допустимых пределах. В отличие от жонглирования несколькими терминалами или вкладками, пул управляет жизненным циклом, фиксирует завершения по мере их наступления и поддерживает конвейер загруженным без ручного присмотра.
Альтернатива на стороне shell
Если удобнее оставаться в shell, есть прямой путь: GNU parallel вместе с jq для извлечения значений из JSON. Для списка значений в samples.json следующая команда запускает три параллельных процесса:
parallel -j3 -a <( jq -r .[] samples.json ) ./GenerateTrimmedNTuple
А если в JSON лежит словарь и нужны только его ключи:
jq -r 'keys | .[]' sample2.json
Итоги
Для контролируемого параллельного запуска внешней C++‑утилиты на каждый образец небольшой ThreadPool вокруг subprocess.run — простой и надежный выбор. Он фиксирует уровень параллелизма, предсказуемо сигнализирует о сбоях при check=True и, при желании, дополняется прогресс‑баром. Держите входные данные в виде списка, отдайте его пулу — и пусть раннер поддерживает три активных процесса, пока не завершатся все образцы.