2025, Oct 01 17:16

ImportError в Airflow Composer с BigQuery: почему «пропали» DAG’и и как это исправить

Кейс в Google Cloud Composer: ImportError в BigQuery-операторах ломает отображение DAG’ов. Как заменить оператор, очистить PyCache и перезапустить Airflow.

Обновления Airflow в Google Cloud Composer иногда проявляются неочевидно. Только что вы устранили ImportError — и вот уже весь набор DAG'ов будто исчез из интерфейса. В этом материале разбираем конкретный случай с операторами BigQuery: почему ImportError может обернуться «пропажей» DAG'ов и как аккуратно устранить проблему без побочных эффектов.

Что произошло

Несколько DAG'ов в Composer опирались на BigQueryCreateEmptyTableOperator. После обновления кода на более новый BigQueryCreateTableOperator, чтобы устранить ImportError, сама ошибка пропала, но все DAG'и перестали отображаться в интерфейсе Airflow. При этом файлы DAG по-прежнему лежали в каталоге GCS /dags, а журналы планировщика не показывали явных ошибок парсинга.

Минимальный пример, который воспроизводит проблему

Проблема начинается с импорта, который больше не разрешается в текущем окружении:

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator
from datetime import datetime
with DAG(
    dag_id="dw_pipeline_daily",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
):
    pass

Airflow выдаёт ImportError: cannot import name 'BigQueryCreateEmptyTableOperator' from 'airflow.providers.google.cloud.operators.bigquery'.

Почему ImportError превратился в «пропажу DAG’ов»

После замены устаревшего импорта на BigQueryCreateTableOperator сам ImportError исчез, но в UI DAG’и всё равно не отображались. Причина в том, что разбор DAG’ов в Airflow может страдать из-за устаревших байткодов Python и процессов, продолжающих использовать кэшированное состояние. Когда меняется импорт, планировщику и веб-серверу зачастую требуется полноценная перезагрузка, чтобы заново построить представление о DAG’ах; иначе парсер может их не зарегистрировать — визуально это выглядит как будто они «пропали».

Есть ещё один важный аспект, который стоит проверить при разборе исходного ImportError. Несоответствие версий провайдеров между окружениями может привести к тому, что в одном окружении подтянется последняя версия провайдера и импорты сломаются, тогда как в другом зафиксирована стабильная версия и всё продолжает работать. Согласованная установка провайдеров в стейджинге и продакшне помогает избежать подобных сюрпризов с ImportError, зависящих от окружения.

Решение

Сначала обновите импорт на поддерживаемый оператор. Затем очистите кэш Python и перезапустите компоненты Airflow, чтобы принудительно выполнить чистый разбор DAG’ов. Эта связка устраняет ImportError и предотвращает симптом «пропавших DAG’ов», обновляя состояние парсера.

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateTableOperator
from datetime import datetime
with DAG(
    dag_id="dw_pipeline_daily",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
):
    pass

После выката изменений очистите PyCache в каталоге с DAG’ами и перезапустите планировщик и веб-сервер Airflow. Так Composer удалит устаревший байткод, перезагрузит модули с обновлённым путём импорта и заново зарегистрирует ваши DAG’и в интерфейсе.

Почему это важно

Динамический разбор DAG’ов в Airflow опирается на устойчивый граф импортов и «чистое» состояние модулей. Когда операторы перемещаются или переименовываются, одной правки строки импорта недостаточно — рантайму также нужно сбросить кэшированные артефакты. Иначе интерфейс может не соответствовать содержимому в GCS-бакете, что приводит к путанице и лишним циклам отладки. Единый подход к управлению версиями провайдеров в разных окружениях дополнительно снижает шум, чтобы то, что работает в одном месте, не падало в другом с ImportError.

Выводы

Заменяя BigQueryCreateEmptyTableOperator на BigQueryCreateTableOperator ради исправления ImportError, обязательно очистите PyCache и перезапустите планировщик и веб-сервер Airflow. Это обеспечивает свежий разбор DAG’ов и не позволяет интерфейсу «потерять» их. Следите за согласованностью версий провайдеров между окружениями, чтобы та же ошибка не всплыла в другом месте. Эти две практики — корректная замена оператора и чистый цикл перезапуска — дают предсказуемые и видимые в UI DAG’и после изменений в коде.

Статья основана на вопросе на StackOverflow от Laura и ответе от Pauls Creationids Interior Des.