2025, Nov 24 00:01

Отмена запланированных задач в ARQ: JobDef против Job и правильный abort()

Показываем, почему попытки через JobDef и delete_job не отменяют задачи в ARQ, и как правильно: получить Job по job_id и вызвать abort(); важные нюансы Redis.

Отмена запланированной задачи в ARQ кажется обманчиво простой — пока не выясняется, что ни один из очевидных способов её действительно не удаляет. Источник путаницы — различие между данными, которые можно перечислять, и данными, которые можно изменять. Если вы пытаетесь остановить запланированную задачу, а вызовы молча ничего не делают, скорее всего, вы работаете не с тем типом объекта.

Демонстрация проблемы

Ниже — типичные, на первый взгляд логичные попытки, которые не приводят к отмене запланированной работы:

rq = await open_arq_conn()
await rq.delete_job(plan.job_id)

rq = await open_arq_conn()
pending = await rq.queued_jobs()
for item in pending:
    if item.job_id == wanted_job_id:
        await item.delete()

for item in pending:
    if item.job_id == plan.job_id:
        await item.abort()

for jdef in pending:
    if jdef.job_id == plan.job_id:
        j = await rq.get_job(jdef.job_id)
        await j.abort()

Что происходит на самом деле

Объекты, которые вы получаете из queued_jobs(), — это экземпляры JobDef. Они описывают поставленную в очередь работу, но не предназначены для изменения. Поэтому вызовы delete() или abort() для элементов из этого списка не дадут нужного эффекта. По той же причине delete_job() — не подходящая операция для прерывания запланированного запуска. Чтобы управлять конкретной задачей, ARQ ожидает, что вы будете работать с экземпляром Job, привязанным к job_id и тому же соединению с Redis.

Решение

Надёжный способ остановить запланированную задачу — создать объект Job и вызвать у него abort(). Так вы напрямую обращаетесь к задаче в Redis с корректной семантикой:

from arq.jobs import Job

async def terminate_scheduled(job_key: str):
    conn = await open_arq_conn()
    task = Job(job_id=job_key, redis=conn)
    outcome = await task.abort()
    return outcome

# usage
result = await terminate_scheduled("your-job-id-here")

Так создаётся Job, связанный с backend’ом Redis, и ARQ получает команду прервать запланированную задачу.

Тонкость, о которой важно помнить

Есть практическая разница между запланированной работой и элементами, уже попавшими в основную очередь. Поэтому перечисление заданий в очереди и попытки менять эти описания не срабатывают. Как кратко сформулировано в одном советe:

Его нужно получить через get_scheduled_jobs() и затем вызвать у задачи метод abort(), поскольку она ещё не в основной очереди.

Главный вывод: для прерывания нужен объект Job.

Почему это важно

Управление жизненным циклом задач — основа предсказуемого планирования. Используйте JobDef для просмотра, а когда нужно действовать — переходите к Job; так вы избегаете «тихих» сбоев, возникающих при использовании неподходящего интерфейса. abort() возвращает понятный булев результат, что упрощает интеграцию отмены в автоматизацию и мониторинг, и соблюдает ограничения ARQ, работая только до начала выполнения.

Если у вас периодические расписания, помните: их удаление может потребовать полного удаления следов в Redis — в зависимости от конфигурации. Это отдельная задача по сравнению с остановкой одного запланированного экземпляра.

Выводы

Когда нужно отменить запланированную задачу ARQ, не работайте с JobDef, полученными из списков. Создайте Job с нужным job_id и вызовите abort(). Это соответствует модели ARQ, где есть только для чтения описания и управляемые дескрипторы задач, даёт однозначный флаг успеха и устраняет путаницу между запланированными элементами и основной очередью. Списки очереди используйте для диагностики, а для управления — Job.