2025, Dec 20 18:03

Как запустить C++‑утилиту с лимитом параллелизма в Python

Ограничиваем параллелизм: запускаем C++‑процессы из Python через subprocess и ThreadPool. Три одновременных задания, прогресс‑бар, обработка ошибок. Примеры.

Когда нужно запускать многопоточный исполняемый файл на C++ на множестве входных данных, но при этом жестко ограничивать параллелизм, простейший пул процессов в Python аккуратно решает эту оркестрацию. Цель — поддерживать одновременно три внешних процесса, при том что каждый из них поднимает по 10 потоков, чтобы общее число потоков в системе оставалось в заданных рамках.

Базовый вариант: с чего начинаем

Отправная точка — управляющий скрипт на Python, который читает имена образцов из JSON и стремится держать в работе ровно три задачи. Эскиз ниже иллюстрирует задуманный поток управления, пока еще без рабочего планировщика.

import json
if __name__ == "__main__":
    # Путь к исполняемому файлу C++, который внутри поднимает 10 потоков
    TOOL_BIN = "./GenerateTrimmedNTuple"
    sample_ids = json.load(open("samples.json"))
    task_list = [[TOOL_BIN, s_id] for s_id in sample_ids]
    # Держим активными не более трех внешних процессов одновременно
    active = 0
    while active < 3:
        # запустить задачу и увеличить счетчик
        # по завершении задачи уменьшить счетчик
        pass

Что на самом деле происходит

Нужно жестко ограничить число одновременно запущенных исполняемых файлов тремя — независимо от количества образцов. Каждый из них самостоятельно создает 10 потоков, и такая оркестрация гарантирует, что суммарно вы не выйдете за предел в 30 потоков. Не хватает лишь надежного способа параллельно запускать внешние команды и получать уведомление об их завершении.

Практическое решение с subprocess и ThreadPool

Сочетание subprocess.run для запуска внешнего бинарника и multiprocessing.pool.ThreadPool для ограничения параллелизма дает ровно то, что нужно. Пул сопоставляет списки аргументов с рабочей функцией, одновременно выполняет не более трех задач и возвращает результаты по мере завершения отдельных процессов.

import multiprocessing.pool as mp_pool
import json
import subprocess
if __name__ == "__main__":
    # Исполняемый файл C++: при каждом запуске создает 10 потоков
    BIN_PATH = "./GenerateTrimmedNTuple"
    items = json.load(open("samples.json"))
    cmd_args = [[BIN_PATH, name] for name in items]
    concurrency = 3
    with mp_pool.ThreadPool(concurrency) as executor:
        def run_job(argv):
            subprocess.run(argv, check=True)
            return argv
        for finished_cmd in executor.imap_unordered(run_job, cmd_args):
            print(f"command: {finished_cmd} finished!")

Параметр check=True заставит драйвер завершиться с ошибкой, если какой-либо внешний процесс вернется с ненулевым кодом. Если нужно продолжать при сбоях, уберите check=True и изучите другие параметры subprocess.run, например capture_output, чтобы явно управлять потоками вывода.

Необязательная индикация прогресса

Хотите прогресс-бар — оберните итератор в tqdm. В качестве total укажите количество команд к выполнению.

import tqdm
# ... в том же контексте и с теми же переменными, что и выше:
for finished_cmd in tqdm.tqdm(executor.imap_unordered(run_job, cmd_args), total=len(cmd_args)):
    print(f"command: {finished_cmd} finished!")

Почему этот подход важен

Такая схема гарантирует, что одновременно работают не более трех исполняемых файлов, а значит, с учетом 10 потоков на процесс общий счетчик остается в допустимых пределах. В отличие от жонглирования несколькими терминалами или вкладками, пул управляет жизненным циклом, фиксирует завершения по мере их наступления и поддерживает конвейер загруженным без ручного присмотра.

Альтернатива на стороне shell

Если удобнее оставаться в shell, есть прямой путь: GNU parallel вместе с jq для извлечения значений из JSON. Для списка значений в samples.json следующая команда запускает три параллельных процесса:

parallel -j3 -a <( jq -r .[] samples.json ) ./GenerateTrimmedNTuple

А если в JSON лежит словарь и нужны только его ключи:

jq -r 'keys | .[]' sample2.json

Итоги

Для контролируемого параллельного запуска внешней C++‑утилиты на каждый образец небольшой ThreadPool вокруг subprocess.run — простой и надежный выбор. Он фиксирует уровень параллелизма, предсказуемо сигнализирует о сбоях при check=True и, при желании, дополняется прогресс‑баром. Держите входные данные в виде списка, отдайте его пулу — и пусть раннер поддерживает три активных процесса, пока не завершатся все образцы.