Apache Airflow: оркестрация рабочих процессов данных

Курс знакомит с Apache Airflow как платформой для оркестрации ETL/ELT и других задач по расписанию. Вы научитесь проектировать DAG, настраивать окружение, управлять зависимостями, мониторингом и масштабированием в продакшене.

1. Введение в Airflow и ключевые концепции

Введение в Airflow и ключевые концепции

Что такое Apache Airflow

Apache Airflow — это платформа для оркестрации рабочих процессов, в которой вы описываете пайплайны как код и затем планируете, запускаете и наблюдаете их выполнение. Airflow часто используют для задач инженерии данных: регулярной загрузки данных, преобразований, запуска расчётов, обновления витрин, машинного обучения и других повторяемых процессов.

Ключевая идея Airflow: вы явно описываете зависимости между шагами, а система берёт на себя планирование, очереди, ретраи, параллельность и наблюдаемость.

Официальные источники:

  • Apache Airflow (официальный сайт)
  • Документация Airflow
  • Зачем нужен оркестратор

    В реальных системах данные редко обрабатываются одним скриптом. Обычно есть цепочка действий:

  • дождаться появления файла или события
  • загрузить данные
  • проверить качество
  • преобразовать
  • записать результат
  • уведомить команду
  • Если запускать всё вручную или через cron, быстро появляются проблемы:

  • нет прозрачной картины зависимостей
  • трудно управлять ретраями и падениями
  • нет единого места, где видно состояние пайплайна
  • сложно масштабировать параллельные запуски
  • Airflow решает эти проблемы, предоставляя:

  • граф зависимостей шагов
  • централизованное планирование
  • повторные попытки и политики ошибок
  • журналирование и UI
  • расширяемость под разные системы
  • Airflow как workflow-as-code

    В Airflow пайплайн задаётся кодом на Python. Это важно по двум причинам:

  • изменения версионируются в Git так же, как и любой код
  • сложную логику (ветвления, шаблоны, генерацию задач) можно выражать программно
  • При этом Airflow не предназначен для тяжёлых вычислений внутри себя. Типичный подход: Airflow оркестрирует внешние системы (Spark, DBT, DWH, Kubernetes Jobs), а не заменяет их.

    Главные сущности Airflow

    DAG

    DAG (Directed Acyclic Graph, ориентированный ацикличный граф) — это описание пайплайна.

  • Ориентированный означает, что зависимости имеют направление: от предшественника к последователю.
  • Ацикличный означает, что нельзя построить зависимость, которая приводит к циклу вида A -> B -> A.
  • Каждый DAG имеет:

  • dag_id — уникальный идентификатор
  • расписание или условия запуска
  • набор задач и зависимости между ними
  • !Пример DAG как цепочки задач и зависимостей

    Task

    Task — шаг в DAG. Важно различать:

  • задачу как объект в графе
  • запуск задачи как конкретную попытку выполнения
  • Одна и та же задача может запускаться много раз в разные моменты времени (например, каждый день).

    Operator

    Operator — шаблон того, что именно делает задача. Например:

  • BashOperator выполняет команду shell
  • PythonOperator вызывает Python-функцию
  • операторы для SQL/облаков/кластеров запускают запросы и джобы во внешних системах
  • В современном Airflow вы также встретите TaskFlow API, где задачи часто объявляют через декоратор @task и пишут более «питоничный» код. Под капотом это всё равно превращается в задачи DAG.

    Task Instance и DAG Run

    Когда DAG стартует, создаётся DAG Run — конкретный запуск пайплайна.

    Внутри него для каждой задачи создаётся Task Instance — конкретное выполнение задачи в рамках данного запуска.

    Это разделение помогает понимать UI:

  • DAG описывает структуру
  • DAG Run — конкретный запуск по расписанию или вручную
  • Task Instance — статус конкретной задачи в конкретном запуске
  • Dependency

    Зависимости — это правила порядка выполнения. Если transform зависит от extract, то transform не запустится, пока extract не выполнится успешно (или не будет обработан в соответствии с настройками).

    Архитектура Airflow на высоком уровне

    Airflow состоит из нескольких компонентов.

  • Webserver
  • Scheduler
  • Metadata Database
  • Executor
  • Workers (в зависимости от типа executor)
  • !Как основные компоненты Airflow взаимодействуют друг с другом

    Кратко по ролям:

  • Webserver показывает UI, графы, логи и статусы.
  • Scheduler читает DAG-файлы, решает что и когда запускать, создаёт Task Instance и отправляет их на исполнение.
  • Metadata Database хранит состояние: DAG Runs, Task Instances, расписания, переменные, подключения.
  • Executor определяет как задачи исполняются (локально, на воркерах, в Kubernetes и т.д.).
  • Workers выполняют задачи (актуально для распределённых executor-ов).
  • Справка по executor-ам в документации:

  • Executors в Airflow
  • Планирование и время в Airflow: базовая модель

    Планирование — источник многих ошибок новичков, поэтому важно зафиксировать терминологию.

  • Расписание определяет, какие интервалы данных должны быть обработаны.
  • Data interval — интервал данных, за который отвечает конкретный DAG Run.
  • Логическая дата (logical date) — метка времени, связанная с запуском и его интервалом данных.
  • Практический смысл: когда вы говорите «ежедневный DAG», Airflow создаёт запуски, каждый из которых отвечает за конкретный день данных.

    Также важны настройки:

  • start_date — с какого момента расписание начинает действовать.
  • catchup — нужно ли «догонять» пропущенные интервалы (если DAG был выключен/не существовал).
  • Подробно:

  • DAG Runs и расписание
  • Timetables и расписания
  • Надёжность: ретраи, таймауты, идемпотентность

    Airflow помогает управлять сбоями, но требует правильного дизайна задач.

    Ретраи и политики ошибок

    Частые настройки на уровне задач:

  • retries — число повторных попыток
  • retry_delay — пауза между попытками
  • execution_timeout — ограничение по времени выполнения
  • Смысл: временные проблемы (сеть, кратковременная недоступность сервиса) часто решаются повтором.

    Идемпотентность

    Идемпотентная задача при повторном запуске приводит систему к тому же корректному состоянию, что и при первом. Это критично, потому что ретраи и ручные перезапуски — нормальная часть эксплуатации.

    Примеры практик:

  • писать результаты с уникальным ключом (например, по дате интервала)
  • сначала писать во временную таблицу/файл, затем атомарно переключать
  • избегать «добавления поверх» без контроля дублей
  • Интеграции: Connections, Variables, Hooks, Sensors

    Airflow умеет подключаться ко многим системам, но есть общие механизмы.

    Connections

    Connection — сохранённые параметры подключения (хост, логин, токен, extra). Это помогает:

  • не хранить секреты в коде DAG
  • централизованно менять параметры
  • Документация:

  • Managing Connections
  • Variables

    Variable — хранение конфигурации (например, путь к бакету, фича-флаг, имя схемы). Переменные удобны, но не предназначены для секретов.

  • Variables
  • Hooks

    Hook — слой для работы с внешней системой (подключение, типовые операции). Многие операторы используют hooks внутри.

    Sensors

    Sensor — задача ожидания события: появления файла, обновления таблицы, завершения внешней джобы. Сенсоры помогают строить пайплайны, которые корректно зависят от внешнего мира.

    Наблюдаемость: UI, логи, статусы

    В Airflow удобно смотреть:

  • граф DAG и статус задач
  • историю запусков
  • логи каждой попытки задачи
  • причины падений и ретраи
  • Практический принцип: если пайплайн нельзя удобно наблюдать и отлаживать, то он будет дорогим в поддержке.

    Минимальный пример DAG

    Ниже пример, который показывает структуру: DAG, две задачи и зависимость.

    Что здесь важно:

  • with DAG(...) as dag задаёт контекст, в котором объявляются задачи.
  • task_id должен быть уникальным внутри DAG.
  • t1 >> t2 означает зависимость: сначала t1, потом t2.
  • catchup=False отключает автоматическое создание «пропущенных» запусков.
  • Как читать и обсуждать пайплайны Airflow

    Чтобы одинаково понимать пайплайн в команде, полезно проговаривать:

  • какой data interval обрабатывает каждый запуск
  • какая точка входа и какие внешние зависимости
  • какие гарантии у задач (идемпотентность, дедупликация)
  • что считается успехом и где лежит результат
  • какая стратегия ретраев и алёртов
  • Что дальше в курсе

    В следующих материалах логично перейти от терминов к практике:

  • установка Airflow и разбор минимальной конфигурации
  • написание DAG-ов: операторы, TaskFlow API, зависимости
  • расписания, backfill и работа с интервалами данных
  • Connections и интеграции
  • наблюдаемость, тестирование и лучшие практики эксплуатации
  • 2. Установка и настройка окружения Airflow

    Установка и настройка окружения Airflow

    В предыдущей статье мы разобрали, что Airflow — это оркестратор, где пайплайн описывается как DAG (граф задач), а затем планируется и исполняется компонентами Airflow (scheduler, webserver, metadata DB, executor). Чтобы перейти к практике и писать первые DAG-и, нужно развернуть окружение так, чтобы:

  • DAG-файлы подхватывались автоматически
  • задачи могли запускаться (хотя бы локально)
  • были доступны UI и логи
  • В этой статье разберём два самых частых способа:

  • установка через pip в виртуальном окружении Python (удобно для изучения и разработки)
  • запуск через Docker Compose (удобно, чтобы быстро получить окружение, похожее на боевое)
  • Какой способ установки выбрать

    Оба варианта подходят для обучения. Выбор зависит от того, что вам важнее: простота Python-разработки или быстрый старт со стандартной инфраструктурой.

    | Способ | Когда выбирать | Плюсы | Минусы | |---|---|---|---| | pip + venv | хотите максимально просто писать Python-код и отлаживать локально | легко ставить/обновлять пакеты, понятная структура | больше ручной настройки, база часто SQLite (не для продакшена) | | Docker Compose | хотите окружение ближе к реальному (webserver, scheduler, БД, иногда Redis) | воспроизводимость, минимум зависимости от ОС | нужен Docker, чуть сложнее интеграция с локальным Python-окружением |

    Официальные разделы документации:

  • Installation
  • Docker Compose
  • Базовые понятия окружения Airflow

    Перед установкой важно понимать несколько терминов, которые вы будете встречать в командах и конфигурации.

  • AIRFLOW_HOME — папка, где Airflow хранит:
  • - airflow.cfg (конфигурация) - dags/ (ваши DAG-файлы) - logs/ (логи выполнения задач) - plugins/ (плагины, если вы их используете)
  • Metadata database — база данных, где Airflow хранит состояние DAG Runs, Task Instances, пользователей, Connections и т.д.
  • Webserver — UI для просмотра DAG-ов, запусков и логов.
  • Scheduler — компонент, который читает DAG-и и планирует выполнение задач.
  • !Структура каталога Airflow и связь папок с компонентами

    Вариант A: установка через pip в виртуальном окружении

    Предварительные требования

  • установлен Python подходящей версии (версию Airflow всегда проверяйте в документации к вашей версии Airflow)
  • установлен pip
  • желательно установить venv (обычно доступен вместе с Python)
  • Если вы работаете в команде, полезная привычка — фиксировать версию Airflow в проекте и обновлять осознанно.

    Шаг 1. Создайте виртуальное окружение

    На Windows активация может отличаться, но идея та же: вы изолируете зависимости проекта от системного Python.

    Шаг 2. Укажите AIRFLOW_HOME

    Airflow по умолчанию создаёт домашнюю папку (часто ~/airflow). Для учебного проекта удобнее хранить всё рядом с кодом.

    Если вы закроете терминал, переменная окружения исчезнет. Для постоянства её можно добавить в профиль оболочки (это зависит от вашей ОС и shell).

    Шаг 3. Установите Airflow с constraints

    Airflow — большой проект с множеством зависимостей. Чтобы избежать конфликтов версий, Airflow предлагает устанавливать зависимости через файл constraints.

    Документация про установку и constraints:

  • Installing Apache Airflow
  • Пример установки (версию подставьте осознанно, как в вашем проекте):

    Поместите туда DAG-файл (например, из предыдущей статьи hello_airflow). Scheduler подхватит его автоматически.

    Вариант B: запуск через Docker Compose

    Этот путь хорош тем, что вы получаете готовую связку компонентов и меньше зависите от настроек локального Python.

    Официальная инструкция:

  • Running Airflow in Docker
  • Шаг 1. Подготовьте проектную папку

    Создайте директорию проекта, в которой будут:

  • docker-compose.yaml
  • папки для DAG-ов, логов, плагинов
  • Обычно структура такая:

    Шаг 2. Возьмите официальный docker-compose.yaml

    В документации Airflow приведён актуальный файл docker-compose.yaml для вашего релиза. Скопируйте его из раздела Docker Compose (важно брать именно официальный пример под вашу версию).

    После этого вы сможете запускать:

    А затем:

    Практический смысл этих команд:

  • airflow-init подготавливает базу и первичную конфигурацию
  • docker compose up поднимает сервисы (webserver, scheduler и т.д.)
  • Шаг 3. Проверьте UI и добавление DAG-ов

  • UI обычно доступен на http://localhost:8080
  • DAG-файлы кладутся в локальную папку dags/, которая примонтирована внутрь контейнеров
  • Минимальная настройка конфигурации

    Где настраивается Airflow

    Airflow читает конфигурацию из:

  • airflow.cfg (файл внутри AIRFLOW_HOME)
  • переменных окружения (часто удобнее в контейнерах и CI)
  • Справочник по параметрам:

  • Configuration Reference
  • CLI and Environment Variables Reference
  • Пара ключевых параметров для старта

  • dags_folder — где Airflow ищет DAG-и
  • load_examples — загружать ли примеры DAG-ов
  • - для обучения иногда удобно включить - для своего проекта чаще отключают, чтобы не засорять UI
  • executor — способ выполнения задач
  • - локально для старта это может быть простой executor - в распределённых окружениях выбирают другой режим (это отдельная большая тема)

    Чтобы быстро отключить примеры в локальной установке, часто правят airflow.cfg или задают переменную окружения (способ зависит от того, как вы запускаете Airflow).

    Проверка работоспособности окружения

    Признаки того, что всё настроено правильно:

  • webserver открывается, вы можете войти пользователем
  • scheduler запущен и видит DAG-и
  • при включении DAG-а создаются DAG Runs (если расписание это предполагает)
  • у задач появляются логи (в UI можно открыть лог конкретной попытки)
  • Полезные команды для диагностики:

    Частые проблемы и как их распознать

  • DAG не появляется в UI
  • - файл лежит не в dags_folder - в DAG-файле ошибка Python (scheduler не может импортировать модуль) - dag_id конфликтует или файл не подхвачен из-за прав доступа
  • UI работает, но задачи не запускаются
  • - не запущен scheduler - неверно настроен executor - задача стоит в очереди из-за ограничений параллельности
  • Нет логов или логи не открываются
  • - проблема с путями logs/ - в Docker — проблема с volume/правами на директорию

    Что дальше

    Теперь у вас есть рабочее окружение: UI, scheduler и место для DAG-файлов. В следующем шаге курса логично перейти к практике написания DAG-ов: базовые операторы и TaskFlow API, зависимости между задачами и первые шаблоны разработки, которые упрощают поддержку пайплайнов.

    3. Создание DAG: операторы, задачи и зависимости

    Создание DAG: операторы, задачи и зависимости

    После установки Airflow (UI, scheduler, папка dags/) следующий практический шаг — научиться писать DAG как код: объявлять задачи, выбирать подходящие операторы и задавать зависимости так, чтобы пайплайн был понятным, надёжным и удобным в сопровождении.

    В этой статье мы разберём:

  • как устроен DAG-файл и когда он исполняется
  • что такое задача и оператор на практике
  • два основных стиля написания DAG-ов: классический и TaskFlow API
  • как задавать зависимости (цепочки, параллельные ветки, объединение)
  • типичные ошибки новичков при создании DAG-ов
  • Как Airflow читает DAG-файлы

    Airflow не запускает ваш DAG-файл как скрипт по расписанию. Вместо этого:

  • scheduler регулярно импортирует Python-файлы из папки dags/
  • при импорте Python-код выполняется, и Airflow получает описание DAG-а и задач
  • затем scheduler создаёт запуски (DAG Run) и запускает задачи согласно зависимостям
  • Отсюда важное правило: DAG-файл должен быстро импортироваться.

    Практические следствия:

  • не делайте тяжёлые вычисления, долгие сетевые запросы и чтение больших файлов на уровне импорта
  • всё «тяжёлое» переносите внутрь задач
  • Документация:

  • DAG file processing
  • Минимальная структура DAG

    Самый понятный старт — классический стиль с контекстным менеджером with DAG(...).

    Здесь:

  • dag_id — уникальный идентификатор DAG-а
  • start_date и schedule задают модель расписания (подробнее было во введении)
  • catchup=False запрещает «догонять» пропущенные интервалы
  • task_id — уникальный идентификатор задачи внутри DAG-а
  • print_date >> say_hello — зависимость: сначала первая задача, потом вторая
  • Задача, оператор и выполнение: как не путаться

    В Airflow легко запутаться в терминах, поэтому зафиксируем их на практике.

  • Operatorтип действия (шаблон выполнения). Например, выполнить bash-команду или вызвать Python-функцию.
  • Taskконкретный шаг в DAG-е, созданный на основе оператора и имеющий task_id.
  • Task Instanceконкретная попытка выполнения задачи в рамках конкретного DAG Run.
  • Один операторный класс (BashOperator) может порождать много задач (разные task_id), а одна задача может исполняться много раз (каждый запуск DAG-а) и иметь ретраи.

    Операторы: какие бывают и как выбрать

    Оператор выбирают по тому, где фактически выполняется работа.

  • BashOperator — удобен для вызова утилит, скриптов, CLI.
  • PythonOperator — запуск Python-функции (полезно для простой логики и интеграций).
  • SQL-операторы и провайдеры — для запросов в базы данных и DWH.
  • Операторы для Kubernetes, Spark, cloud-сервисов — для запуска внешних джоб.
  • Рекомендация по стилю:

  • Airflow должен оркестрировать, а не заменять вычислительный слой
  • тяжёлые трансформации лучше запускать во внешних системах, а в Airflow оставлять управление зависимостями, ретраями и наблюдаемостью
  • Справочник операторов и провайдеров:

  • Operators
  • Providers packages
  • PythonOperator: пример с функцией

    PythonOperator вызывает вашу функцию. Главный принцип: функция должна быть идемпотентной (повторный запуск не ломает данные), потому что ретраи и ручные перезапуски — нормальная практика.

    Здесь schedule=None означает, что DAG не будет запускаться по расписанию и предназначен для ручного запуска или запуска через API.

    TaskFlow API: более современный стиль DAG-ов

    TaskFlow API — это «питоничный» способ писать DAG-и через декораторы. Под капотом всё равно создаются обычные задачи, но код часто получается чище, а передача данных — естественнее.

    Ключевые элементы:

  • @dag — функция, внутри которой описан пайплайн
  • @task — функция-задача
  • Важно понимать, что data = extract() здесь не означает «функция выполнилась прямо сейчас». На этапе импорта DAG-файла Airflow строит граф задач, а сами функции выполняются уже на воркере при запуске.

    Как задавать зависимости между задачами

    Линейная цепочка

  • a >> b означает: b зависит от a
  • b << a означает то же самое, только читается иначе
  • Параллельные ветки

    Одна задача может быть родителем для нескольких.

    Объединение после параллельной обработки

    Несколько задач могут сходиться в одну.

    !Пример параллельных веток и объединения в DAG

    Практическое правило про зависимости

    Зависимости задают порядок и условия старта, но не гарантируют передачу данных.

    Если вам нужно передать небольшие значения между задачами, в Airflow есть механизм XCom.

  • В TaskFlow API возврат значения из @task обычно автоматически становится XCom.
  • В классическом стиле XCom тоже возможен, но требует явной работы через контекст выполнения.
  • Документация:

  • XComs
  • Параметры задач: ретраи, таймауты, владелец

    Часто используемые параметры задач:

  • retries — сколько раз повторить при ошибке
  • retry_delay — пауза между попытками
  • execution_timeout — лимит времени выполнения
  • Пример:

    Практический смысл:

  • retries и retry_delay помогают переживать временные сбои
  • execution_timeout защищает от «вечных» зависаний
  • Документация:

  • Retries
  • Где хранить конфигурацию: Variables и Connections

    Чтобы DAG-и были переносимыми и безопасными:

  • секреты (пароли, токены) храните в Connections, а не в коде
  • несекретную конфигурацию (например, имя схемы, имя бакета, флаги) храните в Variables
  • Документация:

  • Managing Connections
  • Variables
  • Типичные ошибки при создании DAG-ов

    Тяжёлый код на уровне импорта

    Плохо:

  • скачивать данные из API при импорте файла
  • читать большие файлы
  • открывать сетевые соединения
  • Хорошо:

  • выносить работу внутрь задач (PythonOperator, @task, внешние операторы)
  • Неидемпотентные задачи

    Если задача пишет результат «поверх» без контроля дублей, то ретрай или ручной перезапуск создаст повреждённые данные.

    Безопасные подходы:

  • писать данные с ключом интервала (например, дата или logical date)
  • использовать временную таблицу/файл и затем атомарно переключать
  • делать дедупликацию по ключам
  • Смешивание ответственности

    Airflow лучше использовать для:

  • зависимостей
  • расписания
  • ретраев
  • наблюдаемости
  • А вычисления — делегировать специализированным системам.

    Рекомендованный мини-чеклист качества DAG-а

  • DAG-файл быстро импортируется
  • dag_id и task_id стабильны и понятны
  • задачи идемпотентны или имеют явные защиты от дублей
  • зависимости отражают реальный порядок и условия выполнения
  • ретраи и таймауты настроены там, где это оправдано
  • секреты не лежат в коде, используются Connections
  • Что дальше

    Теперь вы умеете собирать DAG-и из задач и операторов и задавать зависимости, чтобы получить понятный граф исполнения. Следующий логичный шаг — глубже разобраться в планировании (интервалы данных, start_date, catchup, backfill), а также в практиках разработки DAG-ов для командной работы: структура репозитория, тестирование, код-стайл и переиспользование компонентов.

    4. Расписания, параметры, XCom и переменные

    Расписания, параметры, XCom и переменные

    После того как вы научились поднимать окружение Airflow и писать DAG-и с задачами и зависимостями, следующий шаг — разобраться, как Airflow решает, когда создавать запуск, как прокидывать конфигурацию в задачи, как передавать небольшие данные между задачами и где хранить настройки пайплайнов.

    В этой статье разберём четыре практических блока:

  • расписания и модель времени в Airflow
  • параметры запуска и шаблонизация
  • XCom для передачи небольших значений
  • Variables для конфигурации DAG-ов
  • Расписания: как Airflow создаёт DAG Run

    Airflow не «запускает DAG по расписанию» в смысле cron-скрипта. Вместо этого scheduler создаёт DAG Run, каждый из которых отвечает за интервал данных.

    Ключевые сущности времени:

  • schedule задаёт, как часто нужно создавать запуски.
  • data interval описывает, какой диапазон данных обрабатывает конкретный DAG Run.
  • logical date — логическая метка запуска, связанная с интервалом данных (это не обязательно момент фактического старта).
  • Официально про запуски и интервалы:

  • DAG Runs
  • !Временная модель: интервал данных и логическая дата для ежедневного расписания

    start_date, schedule и catchup

    В определении DAG-а почти всегда встречаются три настройки:

  • start_date — с какого момента расписание начинает действовать.
  • schedule — правило генерации запусков.
  • catchup — нужно ли создавать пропущенные исторические запуски.
  • Пример:

    Практический смысл:

  • schedule="@daily" создаёт ежедневные интервалы данных.
  • catchup=False говорит scheduler-у: не создавать задним числом все запуски с 2025-01-01 до сегодня, если DAG включили позднее.
  • Про расписания и их типы:

  • Timetables и расписания
  • Preset-расписания и cron

    Airflow поддерживает:

  • preset-строки, например @daily, @hourly, @weekly.
  • cron-выражения, например 0 2 *.
  • Рекомендации:

  • используйте preset-расписания, когда подходит типовая периодичность
  • используйте cron, когда нужен точный контроль времени
  • DAG без расписания

    Если schedule=None, DAG не будет создавать DAG Run автоматически.

    Типичные случаи:

  • ручные/разовые пайплайны
  • пайплайны, запускаемые внешней системой через API
  • пайплайны, которые вы хотите триггерить только при событии
  • Параметры и конфигурация запуска

    В реальной работе один и тот же DAG часто нужно запускать с разными настройками:

  • выбрать схему/таблицу назначения
  • включить или выключить часть шагов
  • изменить поведение ретеншна
  • прогнать пайплайн для конкретной даты или набора сущностей
  • В Airflow есть два основных механизма:

  • Params — параметры DAG-а и задач с дефолтами (удобно для декларативной конфигурации).
  • dag_run.conf — конфигурация конкретного запуска (передаётся при ручном Trigger).
  • Params: параметры DAG-а и задач

    Params — это словарь параметров, который можно определить на уровне DAG-а (и при необходимости переопределять на уровне задач). Эти параметры удобно использовать в шаблонах.

    Документация:

  • Params
  • Пример с параметром окружения:

    dag_run.conf: конфигурация конкретного запуска

    Когда вы вручную запускаете DAG из UI или через API, можно передать JSON-конфигурацию. Она доступна как dag_run.conf.

    Сценарии:

  • разовый прогон с необычными аргументами
  • отладка
  • запуск пайплайна внешней системой, которая передаёт параметры
  • Пример использования в шаблоне:

    Практическое правило:

  • params хороши как дефолтная конфигурация пайплайна
  • dag_run.conf хорош как конфигурация конкретного запуска
  • Шаблонизация: как подставлять значения в команды

    Многие операторы поддерживают шаблонизацию полей (например, bash_command). Шаблоны пишутся на Jinja и имеют доступ к контексту выполнения.

    Документация:

  • Шаблоны и контекст
  • Часто используемые значения контекста:

  • {{ ds }} — дата интервала в формате YYYY-MM-DD
  • {{ logical_date }} — логическая дата (как объект времени, обычно выводится как timestamp)
  • {{ params.<name> }} — параметр из params
  • {{ dag_run.conf.get('key') }} — значение из конфигурации запуска
  • Важно:

  • шаблоны вычисляются во время выполнения задачи, а не при импорте DAG-файла
  • это один из стандартных способов сделать задачи идемпотентными, привязывая пути/таблицы к дате интервала
  • XCom: передача небольших данных между задачами

    Иногда нужно, чтобы одна задача отдала другой небольшой результат:

  • путь к созданному файлу
  • идентификатор внешней джобы
  • число строк в загруженной таблице
  • Для этого в Airflow есть XCom (cross-communication) — механизм обмена небольшими сообщениями через metadata database.

    Документация:

  • XComs
  • XCom в TaskFlow API

    В TaskFlow API возврат значения из @task автоматически превращается в XCom.

    XCom в классическом стиле

    В классическом стиле XCom можно писать и читать явно.

    Ограничения и лучшие практики XCom

    XCom хранится в metadata database, поэтому это не транспорт для больших данных.

    Практические правила:

  • передавайте через XCom только небольшие значения (идентификаторы, пути, статусы)
  • большие данные храните во внешнем хранилище (S3, GCS, HDFS, DWH), а через XCom передавайте только ссылку
  • следите за стабильностью ключей и task_id, иначе чтение XCom станет хрупким
  • !Идея XCom: передаём ссылку на данные, а не данные

    Variables: централизованная конфигурация DAG-ов

    Variable — это запись в metadata database, которую можно читать из DAG-ов и задач. Обычно это несекретные настройки, которые удобно менять без правки кода.

    Документация:

  • Variables
  • Примеры того, что хранить в Variables:

  • имя окружения dev/stage/prod
  • название бакета или базовой директории
  • фиче-флаги, переключающие ветки пайплайна
  • список сущностей для обработки
  • Что не стоит хранить в Variables:

  • пароли и токены в открытом виде
  • Для секретов используйте Connections или секрет-хранилище, подключённое к Airflow.

    Чтение Variable в коде

    Если вы храните JSON:

    Variables против Params

    Оба механизма похожи тем, что это конфигурация, но применяются по-разному.

    | Механизм | Где задаётся | Когда удобно | Ограничения | |---|---|---|---| | params | в коде DAG-а (с дефолтами), можно переопределять на запуске | когда нужны явные дефолты рядом с кодом и шаблонами | изменение дефолтов требует деплоя DAG | | Variable | в UI/CLI/API в metadata database | когда нужно менять поведение без изменения кода | риск превратить Variables в «скрытый конфиг», если не документировать |

    Практический чеклист

    Перед тем как считать DAG готовым к регулярным запускам, проверьте:

  • расписание соответствует тому, какой интервал данных должен обрабатываться
  • catchup выставлен осознанно
  • параметры запуска оформлены через params и шаблоны там, где это упрощает поддержку
  • через XCom передаются только небольшие значения, а не данные
  • конфигурация пайплайна хранится в Variables, а секреты вынесены из кода
  • Что дальше

    Теперь у вас есть полный набор базовых инструментов, чтобы сделать DAG управляемым:

  • расписание создаёт правильные интервалы
  • параметры и шаблоны позволяют переиспользовать код
  • XCom позволяет прокинуть небольшие результаты между задачами
  • Variables централизуют конфигурацию
  • Следующий логичный шаг в курсе — перейти к практикам разработки и эксплуатации: структура репозитория DAG-ов, тестирование, алёрты, ограничения параллельности, пулы и работа с окружениями.

    5. Подключения, хуки, провайдеры и интеграции

    Подключения, хуки, провайдеры и интеграции

    В предыдущих статьях вы научились поднимать окружение Airflow, писать DAG-и, настраивать расписания, передавать небольшие значения через XCom и хранить конфигурацию в Variables. Дальше логично перейти к теме, без которой Airflow редко используют в реальных проектах: как Airflow подключается к внешним системам и как эти интеграции правильно устроены.

    Airflow почти всегда выступает оркестратором: он управляет порядком шагов, ретраями и наблюдаемостью, а фактическая работа часто происходит во внешних системах: базах данных, DWH, объектных хранилищах, API, Kubernetes, Spark.

    Зачем нужны интеграции в Airflow

    В типичном пайплайне вам нужно:

  • выполнить SQL в базе данных
  • скачать данные из HTTP API
  • положить файлы в S3-совместимое хранилище
  • запустить job в Kubernetes
  • дождаться готовности таблицы или файла
  • Чтобы решать эти задачи стандартно и безопасно, в Airflow есть связка Providers → Operators/Sensors → Hooks → Connections.

    !Как элементы интеграций связаны между собой

    Providers: пакеты интеграций

    Provider — это пакет, который добавляет поддержку конкретной экосистемы: операторы, хуки, сенсоры, типы подключений.

  • Airflow core содержит базовые возможности и общие компоненты.
  • Интеграции вынесены в отдельные пакеты провайдеров, чтобы их можно было обновлять независимо.
  • Официальный каталог провайдеров:

  • Airflow Providers Packages
  • Установка провайдера

    Если вы хотите работать, например, с PostgreSQL, обычно ставят провайдер:

    Если вы используете Docker Compose, провайдеры либо уже есть в образе, либо их добавляют через кастомный образ (это зависит от того, как устроен ваш проект).

    Практические правила:

  • фиксируйте версии Airflow и провайдеров (особенно в продакшене)
  • проверяйте совместимость версий в документации релиза
  • Connections: где и как хранить параметры подключения

    Connection — это сущность Airflow, которая хранит параметры доступа к внешней системе:

  • хост
  • порт
  • логин
  • пароль или токен
  • схема или база (зависит от типа)
  • поле extra для дополнительных настроек (часто в JSON)
  • Документация:

  • Managing Connections
  • Почему Connections важнее, чем “зашить секреты в код”

    Если вы положите токены и пароли прямо в DAG-файлы, вы быстро получите проблемы:

  • утечки в Git
  • разные секреты для dev/stage/prod
  • сложность ротации
  • Connections дают стандартный способ хранить и менять параметры подключения без изменения кода DAG-а.

    conn_id и соглашения об именовании

    Каждое подключение имеет идентификатор conn_id. Его обычно используют в операторах и хуках.

    Рекомендации:

  • делайте conn_id стабильным и понятным
  • отражайте систему и окружение, если это важно
  • Примеры:

  • postgres_dwh
  • s3_raw
  • http_payments_api
  • Где задаются Connections

    Чаще всего Connections задают одним из способов:

  • через UI Airflow
  • через CLI
  • через переменные окружения (особенно удобно в контейнерах)
  • через секрет-хранилища (Secrets Backend)
  • Документация по Secrets Backend:

  • Secrets Backend
  • Connections через переменные окружения

    Airflow поддерживает передачу connection URI через переменные окружения формата AIRFLOW_CONN_<CONN_ID>.

    Документация:

  • Environment Variables
  • Пример (идея, конкретный URI зависит от типа подключения):

    Практический смысл:

  • удобно для Kubernetes и Docker
  • секреты можно прокидывать через secret manager платформы
  • Connections и Variables: что куда

    | Механизм | Для чего | Секреты | Когда менять | |---|---|---|---| | Connection | доступ к внешним системам | да | без изменения кода DAG | | Variable | конфигурация пайплайна | обычно нет | без изменения кода DAG | | params | дефолтные параметры запуска рядом с кодом | нет | через деплой DAG или override при запуске |

    Главное правило: секреты храните в Connections или в секрет-хранилище, а не в Variables и не в коде.

    Hooks: единый “клиент” для внешних систем

    Hook — это Python-класс, который знает, как:

  • взять нужное подключение по conn_id
  • создать клиент или соединение
  • выполнить типовые операции (запрос, загрузку, листинг файлов)
  • Hooks нужны, чтобы:

  • не писать вручную логику подключения в каждой задаче
  • переиспользовать типовые методы
  • держать работу с внешней системой в одном месте
  • Концептуально:

  • Operator описывает шаг пайплайна
  • Hook реализует детали взаимодействия с системой
  • Как Operator связан с Hook

    Часто оператор просто оборачивает hook.

    Пример на уровне идеи:

  • PostgresOperator выполняет SQL
  • внутри использует PostgresHook, который достаёт параметры подключения из Connection
  • Это упрощает поддержку: вам не нужно самим открывать соединение и управлять деталями драйвера.

    Operators и Sensors в контексте интеграций

    Когда вы выбираете между оператором и сенсором, полезно помнить различие:

  • Operator делает действие (запрос, загрузка, запуск job)
  • Sensor ждёт условия (появился файл, таблица обновилась, job завершился)
  • При этом и оператор, и сенсор обычно используют hook и connection.

    Документация про сенсоры:

  • Sensors
  • Практика: безопасное использование Connections в Python-задаче

    Иногда стандартного оператора недостаточно, и вы хотите работать через hook в @task.

    Ниже пример, который показывает сам принцип: как получить параметры подключения по conn_id. Это удобно для отладки и для кастомной логики.

    Что важно:

  • BaseHook.get_connection() достаёт Connection из настроек Airflow (или из secrets backend, если он включён)
  • пароли и токены нельзя писать в логи
  • бизнес-данные через XCom передавать не нужно, а вот “метаданные” вроде хоста или схемы для диагностики могут быть полезны
  • Практика: провайдерные хуки и операторы

    В реальной работе чаще используют провайдерные хуки и операторы. Общий подход такой:

  • для простого SQL используйте оператор
  • для более сложной логики используйте hook внутри PythonOperator или @task
  • Пример для SQL через оператор зависит от конкретного провайдера, но паттерн почти всегда одинаковый:

  • у оператора есть параметр вроде conn_id или postgres_conn_id
  • сам оператор берёт Connection и выполняет действие
  • Поле extra: как хранить нестандартные настройки

    У многих систем есть настройки, которые не помещаются в стандартный набор полей “host/login/password”. Для этого в Connection есть extra.

    Типичные примеры:

  • параметры SSL
  • region для облака
  • флаги поведения клиента
  • Практические правила:

  • документируйте, какие ключи вы храните в extra
  • не делайте extra “свалкой всего” без структуры
  • Лучшие практики интеграций

    Разделяйте код DAG-а и код интеграций

    Чтобы DAG-и были читаемыми и быстро импортировались:

  • не создавайте клиентов внешних систем на уровне импорта DAG-файла
  • выносите сложную логику в функции задач или отдельные модули
  • используйте hooks и provider-операторы вместо “самописного подключения”
  • Делайте задачи идемпотентными

    Интеграции почти всегда сталкиваются с повторами:

  • ретраи задач
  • ручные перезапуски
  • backfill
  • Поэтому:

  • пишите результаты так, чтобы повтор не портил данные
  • используйте {{ ds }} или {{ logical_date }} в путях и именах таблиц, если это помогает (шаблонизацию вы разбирали в предыдущей статье)
  • Не передавайте данные через XCom

    XCom предназначен для небольших значений:

  • идентификатор job
  • путь к файлу
  • число обработанных строк
  • Сами данные храните во внешних системах, а через XCom передавайте только ссылки.

    Выбирайте место хранения секретов осознанно

    Варианты:

  • Connections в metadata database
  • Secrets Backend (Vault, AWS Secrets Manager и другие)
  • переменные окружения с connection URI
  • Выбор зависит от вашей инфраструктуры, но принцип одинаков: секреты не должны жить в DAG-коде.

    Итоги

    В Airflow интеграции строятся по стандартной модели:

  • Providers добавляют интеграции с системами
  • Connections хранят параметры доступа и секреты
  • Hooks инкапсулируют работу с внешними системами
  • Operators и Sensors используют hooks, чтобы выполнять действия или ожидать события
  • После этого блока вы готовы проектировать DAG-и, которые безопасно и повторяемо взаимодействуют с внешними системами, оставаясь при этом управляемыми через UI и конфигурацию Airflow.

    6. Мониторинг, логирование, алерты и отладка

    Мониторинг, логирование, алерты и отладка

    Airflow обычно внедряют не только ради расписания и зависимостей, но и ради наблюдаемости: чтобы быстро понять, что сломалось, где, почему и что с этим делать. В предыдущих статьях вы научились создавать DAG-и, настраивать расписания, использовать XCom/Variables и подключаться к внешним системам через провайдеры и Connections. Теперь соберём это в эксплуатационную картину: как смотреть состояние пайплайнов, читать логи, настраивать алерты и эффективно отлаживать задачи.

    Полезные разделы документации:

  • Logging and Monitoring
  • Airflow UI
  • Tasks
  • Что именно мы мониторим в Airflow

    В Airflow есть несколько уровней, где возникают проблемы — и важно понимать, на каком уровне вы сейчас ищете причину.

  • DAG
  • DAG Run
  • Task Instance
  • Инфраструктура Airflow (scheduler, webserver, metadata database, executor/воркеры)
  • Внешние системы (DWH, S3, API, Kubernetes), которые Airflow только оркестрирует
  • Практический вывод: если упала задача, причина может быть как в вашем коде/SQL, так и в подключении (Connection), лимитах параллельности, недоступности внешней системы или проблемах самого scheduler.

    !Карта того, откуда брать сигналы при проблемах

    Airflow UI как центральная панель диагностики

    UI — это первое место, куда обычно идут при инциденте.

    Какие представления в UI чаще всего используют

  • DAGs list
  • Grid
  • Graph
  • Gantt
  • Task Instance details
  • Logs
  • Минимальный алгоритм разбора инцидента через UI

  • Откройте DAG и найдите проблемный DAG Run.
  • В Grid определите, какая задача упала первой и является источником каскада.
  • Откройте лог конкретной попытки (помните про retries: попыток может быть несколько).
  • Проверьте, не связано ли падение с внешней системой:
  • - Connection/права доступа - таймауты сети - лимиты/квоты - изменения схемы/таблицы
  • Если задача не падает, но «висит» в статусе ожидания, проверьте:
  • - очередь и executor - ограничения параллельности и пулы - сенсоры (ожидание события)

    Логирование: какие логи бывают и где их искать

    В Airflow есть два больших класса логов.

  • Task logs
  • Service logs
  • Task logs: главный источник правды о падении задачи

    Task log — это лог конкретной Task Instance (конкретной попытки выполнения задачи в конкретном DAG Run). Именно здесь вы чаще всего увидите:

  • трассировку исключения Python
  • текст ошибки драйвера базы данных
  • HTTP-код ответа API
  • вывод bash-команды
  • сообщения вашего кода (например, через logging)
  • Практические моменты:

  • Всегда смотрите правильную попытку: иногда первая попытка упала по временной причине, а вторая прошла.
  • В распределённом исполнении лог мог быть создан на воркере, а затем сохранён/отправлен в удалённое хранилище (зависит от настройки логирования).
  • Service logs: когда проблема не в задаче

    Service logs — это логи компонентов Airflow. Они нужны, когда:

  • DAG не появляется в UI
  • DAG не стартует по расписанию
  • задачи стоят в статусе queued, но не исполняются
  • есть подозрение на проблему с metadata DB, scheduler, executor
  • На практике чаще всего читают:

  • логи scheduler (обработка DAG-файлов, создание DAG Run, постановка задач в очередь)
  • логи webserver (реже, обычно при проблемах UI)
  • логи worker/executor (ошибки запуска процессов, проблемы окружения)
  • Типичная проблема: DAG не виден или не обновляется

    Причины часто такие:

  • Python-ошибка при импорте DAG-файла
  • тяжёлый код на уровне импорта (долго импортируется)
  • не та папка dags_folder
  • Что делать:

  • в UI обычно видно раздел с ошибками импорта
  • дополнительно проверьте service logs scheduler
  • Алерты: как сделать так, чтобы команда узнала о проблеме вовремя

    Алертинг в Airflow можно построить несколькими слоями. Важно не пытаться «уведомлять обо всём»: цель алертов — быстро поймать реальные инциденты, а не создать шум.

    Что имеет смысл алертить

  • падение критичных задач или всего DAG
  • превышение ожидаемого времени выполнения (особенно для ежедневных витрин)
  • отсутствие запусков (DAG перестал запускаться)
  • деградации scheduler/executor (если вы мониторите инфраструктуру метриками)
  • Встроенные уведомления по email

    Во многих установках используют базовый механизм email-уведомлений на падения (он прост, но ограничен по гибкости). Логика настраивается на уровне задач или default_args DAG-а.

    Удобно как минимальный уровень, но в зрелой эксплуатации часто переходят на:

  • уведомления в чат
  • инцидент-менеджмент
  • централизованную обработку событий мониторинга
  • Callbacks: управляемые хуки на события задачи

    В Airflow есть механизм callbacks — функции, которые вызываются при событиях жизненного цикла задачи (например, при падении). Это позволяет отправлять алерты туда, куда принято в вашей компании.

    Например, часто используют on_failure_callback на уровне задач и/или DAG-а.

    Рекомендации:

  • делайте уведомления короткими, но полезными: dag_id, task_id, logical date, ссылка на лог
  • не логируйте секреты и не прикрепляйте чувствительные данные
  • учитывайте retries: иногда алерт нужен только после финального падения
  • Подробности о колбэках находятся в документации задач:

  • Tasks
  • Уведомления как отдельная задача в DAG

    Иногда удобнее завести отдельную задачу notify, которая срабатывает при падении upstream-задач по правилам trigger.

    Плюсы:

  • уведомление видно как часть DAG
  • можно централизовать формат сообщений
  • Минусы:

  • уведомление само становится задачей, которой нужен стабильный доступ к внешнему сервису
  • Практический принцип: чем проще и надёжнее канал уведомлений, тем лучше он работает в реальном инциденте.

    Метрики и мониторинг инфраструктуры Airflow

    Логи отвечают на вопрос почему упало, но метрики отвечают на вопрос как система ведёт себя в целом.

    Что обычно мониторят метриками:

  • задержку планирования (scheduler не успевает)
  • длину очередей (задачи копятся)
  • длительности задач и их распределения
  • стабильность работы воркеров/executor
  • Даже без глубокой настройки метрик полезно иметь базовые наблюдения:

  • DAG-ы стартуют вовремя
  • нет систематического роста длительности
  • нет массовых queued без исполнения
  • Если в вашей инфраструктуре есть Prometheus/Grafana или другой стек мониторинга, Airflow обычно интегрируют туда на уровне deployment.

    Отладка: как воспроизводить проблемы быстро и безопасно

    Главный принцип отладки Airflow-задач

    Отладка почти всегда эффективнее, когда вы можете:

  • воспроизвести проблему локально или в dev
  • запустить только одну задачу, а не весь DAG
  • увидеть, какие шаблоны реально подставились
  • Полезные CLI-команды для отладки

    Ниже команды, которые часто используют при разработке и расследовании.

  • airflow dags list-import-errors
  • airflow tasks test <dag_id> <task_id> <logical_date>
  • airflow dags test <dag_id> <logical_date>
  • airflow tasks render <dag_id> <task_id> <logical_date>
  • Что они дают:

  • list-import-errors быстро показывает проблемы импорта DAG-ов
  • tasks test запускает конкретную задачу в изоляции (очень полезно для воспроизведения)
  • dags test прогоняет DAG целиком в тестовом режиме
  • tasks render показывает, во что превратились Jinja-шаблоны (например, {{ ds }})
  • Справочник CLI команд и параметров:

  • CLI and Environment Variables Reference
  • Частые причины «странных» падений и как их распознать

  • Задача падает только по расписанию, но проходит при ручном запуске
  • Задача не падает, но постоянно ретраится
  • Задачи стоят в queued
  • Сенсор долго ждёт
  • Ошибки доступа к секретам/подключениям
  • Практический подход к расследованию:

  • Сначала зафиксируйте симптом в UI (какой статус, какая попытка, что в логах).
  • Проверьте, не отличается ли конфигурация окружения (dev/prod), Connections, Variables.
  • Если ошибка в шаблонизации путей/таблиц, используйте airflow tasks render.
  • Если проблема воспроизводится, минимизируйте её до одной задачи через airflow tasks test.
  • Если похоже на проблему планировщика или очереди — смотрите scheduler/service logs и метрики.
  • Практические правила «боевой» наблюдаемости

  • Идемпотентность важнее алертов: если задача безопасно переживает ретраи и перезапуски, инцидентов будет меньше.
  • Секреты не должны попадать в логи: используйте Connections и secrets backend, не печатайте пароли/токены.
  • Алерты должны быть полезными: лучше один алерт с ссылкой на лог и контекстом, чем десять шумных сообщений.
  • Не храните данные в XCom ради отладки: передавайте через XCom ссылки и идентификаторы, а не большие payload.
  • Итоги

    Мониторинг и отладка в Airflow строятся на четырёх опорах:

  • UI для статусов, графов, длительностей и быстрого входа в инцидент
  • Task logs как основной источник причины падения
  • Service logs и метрики для проблем планирования, очередей и инфраструктуры
  • Алертинг через email/колбэки/отдельные notification-задачи, чтобы команда узнавал о проблемах вовремя
  • С этим набором вы можете не только написать DAG, но и поддерживать его в эксплуатации: быстро находить причины сбоев, уменьшать время восстановления и делать пайплайны предсказуемыми.

    7. Деплой и масштабирование: Executors, Celery, Kubernetes

    Деплой и масштабирование: Executors, Celery, Kubernetes

    Airflow из предыдущих статей уже умеет запускать ваши DAG-и, показывать статусы в UI, хранить Connections/Variables и помогать с отладкой через логи и CLI. Но как только пайплайнов становится больше, а задачи начинают выполняться параллельно и дольше, возникает главный практический вопрос: где именно исполняются задачи и как масштабировать выполнение.

    В Airflow за это отвечает executor. В этой статье разберём:

  • что такое executor и как он связан со scheduler, воркерами и очередями
  • базовые режимы: локально и распределённо
  • CeleryExecutor: архитектура, когда выбирать, типовые настройки
  • KubernetesExecutor: архитектура, когда выбирать, особенности контейнеров
  • практические критерии выбора и чеклист продакшен-деплоя
  • Официальные ссылки для углубления:

  • Executors в Airflow
  • CeleryExecutor
  • KubernetesExecutor
  • Что такое executor и почему он критичен

    Executor в Airflow отвечает за то, как scheduler отдаёт задачи на выполнение.

    Упрощённо:

  • scheduler создаёт Task Instance и решает, что их пора запускать
  • executor принимает решение, где и каким образом запустить выполнение
  • задача реально исполняется либо в локальном процессе, либо на воркере, либо в Kubernetes Pod
  • !Как executor связывает scheduler и фактическое место выполнения задач

    Практический смысл выбора executor-а:

  • параллельность: сколько задач одновременно реально выполняется
  • надёжность: что происходит при падении воркера/ноды
  • изоляция окружения: могут ли разные задачи иметь разные зависимости
  • операционные затраты: насколько сложно поддерживать инфраструктуру
  • Минимальный набор компонентов для продакшена

    Даже если вы меняете executor, базовая «ось» Airflow почти всегда одна:

  • Metadata Database: обычно PostgreSQL (SQLite годится для обучения, но не для продакшена)
  • Scheduler: планирует и отдаёт задачи executor-у
  • Webserver: UI
  • Дальше зависит от executor-а:

  • для Celery добавляются broker и workers
  • для Kubernetes добавляются настройки кластера и шаблоны Pod-ов
  • Обзор executor-ов: что вы встретите чаще всего

    | Executor | Где выполняются задачи | Масштабирование | Когда выбирать | |---|---|---|---| | SequentialExecutor | последовательно на одной машине | почти нет | только как учебный/совсем простой режим | | LocalExecutor | параллельные процессы на одной машине | вертикально (CPU/RAM) | dev/небольшие инсталляции, простая эксплуатация | | CeleryExecutor | на распределённых воркерах через брокер | горизонтально (добавляем workers) | классический вариант для on-prem и VM, много задач | | KubernetesExecutor | каждая задача в отдельном Kubernetes Pod | горизонтально через кластер | если у вас Kubernetes и важна контейнерная изоляция |

    Важно: выбор executor-а не отменяет остальных тем курса.

  • Настройки ретраев, таймаутов и идемпотентности из статей про задачи остаются обязательными.
  • Мониторинг и логи из статьи про наблюдаемость становятся ещё важнее, потому что задач больше и они распределены.
  • LocalExecutor как отправная точка для понимания масштабирования

    LocalExecutor запускает задачи параллельно на той же машине, где работает scheduler.

    Плюсы:

  • минимум инфраструктуры
  • проще дебажить окружение (одна машина)
  • Минусы:

  • ограничение ресурсами одной машины
  • сложнее изолировать зависимости задач: все задачи используют одно окружение Airflow
  • Практическая рекомендация:

  • использовать LocalExecutor для dev и небольшой нагрузки
  • переходить на Celery или Kubernetes, когда:
  • - задач становится много - растут требования к параллельности - нужна изоляция окружений

    CeleryExecutor: распределённые воркеры через брокер

    Идея архитектуры

    CeleryExecutor использует Celery как механизм распределения задач.

    Ключевые элементы:

  • Broker: очередь сообщений, куда scheduler «кладёт» задания (обычно Redis или RabbitMQ)
  • Celery Workers: процессы, которые забирают задания из брокера и выполняют их
  • Metadata DB: хранит состояние (DAG Runs, Task Instances, retries)
  • !Как CeleryExecutor распределяет выполнение задач

    Когда выбирать CeleryExecutor

    CeleryExecutor хорошо подходит, если:

  • у вас нет Kubernetes как стандартной платформы
  • инфраструктура на VM или bare metal
  • вы хотите масштабировать выполнение задач добавлением воркеров
  • у вас много коротких или средних задач, которые удобно «раздавать» воркерам
  • Типичный продакшен-набор:

  • PostgreSQL как metadata DB
  • Redis или RabbitMQ как broker
  • несколько worker-нод
  • Что важно в эксплуатации CeleryExecutor

    Ключевые практические моменты:

  • воркеры должны видеть DAG-код
  • - обычно это общий volume, общий образ или механизм доставки DAG-ов
  • воркеры должны иметь те же зависимости, что и задачи
  • - если задача требует библиотеку, она должна быть в окружении воркера
  • нужно понимать очереди
  • - часто разные классы задач (быстрые, тяжёлые, внешние интеграции) удобно разделять по очередям

    Очереди и изоляция нагрузки

    В Airflow есть понятие очереди исполнения. В CeleryExecutor это особенно полезно:

  • вы можете направлять разные задачи в разные очереди
  • разные очереди могут обрабатываться разными группами воркеров
  • Зачем это нужно:

  • чтобы тяжёлые задачи не «забивали» воркеры для критичных быстрых задач
  • чтобы изолировать интеграции (например, задачи с доступом в приватную сеть)
  • Типичные проблемы CeleryExecutor

  • задачи висят в queued
  • - воркеров мало или они не запущены - брокер недоступен - ограничения параллельности и пулы мешают старту (эти ограничения обсуждаются в операционных настройках Airflow)
  • разные версии кода на scheduler и worker
  • - приводит к ошибкам импорта или несовместимости
  • в окружении воркера нет зависимостей
  • - часто проявляется как падения Python-задач из-за ImportError

    KubernetesExecutor: задача как отдельный Pod

    Идея архитектуры

    KubernetesExecutor запускает каждую Task Instance в отдельном Kubernetes Pod.

    Поток в общих чертах:

  • scheduler решает, что задачу пора запускать
  • executor создаёт Pod в Kubernetes
  • Pod подтягивает образ, запускает задачу, пишет логи
  • статус задачи фиксируется в metadata DB, UI показывает результат
  • !Как KubernetesExecutor запускает каждую задачу в отдельном Pod

    Когда выбирать KubernetesExecutor

    KubernetesExecutor подходит, если:

  • Kubernetes уже является вашей стандартной платформой
  • важна контейнерная изоляция задач
  • нужно масштабирование «по требованию»: pods создаются и исчезают
  • у задач разные зависимости и удобнее решать это образами
  • Практический плюс: вы меньше зависите от «общего окружения воркера», потому что среда выполнения описана контейнером.

    Что важно в модели Kubernetes

    Чтобы задачи стабильно работали в KubernetesExecutor, обычно нужно заранее договориться о трёх вещах.

  • Контейнерный образ
  • - где лежит ваш Airflow runtime и зависимости - как туда попадают DAG-и (в образ, через volume, через git-sync или другой механизм)
  • Доступы и секреты
  • - подключения и ключи не должны попадать в логи - секреты чаще прокидывают через Kubernetes Secrets и конфигурацию Airflow (например, через переменные окружения или secrets backend)
  • Логи
  • - в Kubernetes часто используют удалённое логирование или сбор логов через платформенные инструменты

    Про общий раздел логирования и мониторинга:

  • Logging and Monitoring
  • Типичные проблемы KubernetesExecutor

  • Pod не стартует
  • - не может скачать образ (нет доступа к registry) - неверные права ServiceAccount - не хватает ресурсов на нодах
  • DAG не найден внутри Pod
  • - DAG-и не доставлены в контейнер - неверно настроен путь к dags_folder
  • задача работает локально, но падает в Pod
  • - отличается окружение, переменные, доступы в сеть

    Как выбирать между CeleryExecutor и KubernetesExecutor

    Выбор редко бывает «только техническим», обычно он определяется платформой и операционной зрелостью.

    Быстрый практический ориентир

  • выбирайте CeleryExecutor, если:
  • - у вас VM/on-prem инфраструктура - вы готовы поддерживать broker и пул воркеров - все задачи могут жить в одном типовом окружении воркера

  • выбирайте KubernetesExecutor, если:
  • - Kubernetes уже используется как стандарт - важна изоляция задач и удобство контейнеров - вы хотите создавать окружение задач через образы и декларативные настройки

    Компромиссы, о которых важно помнить

  • CeleryExecutor проще по контейнерной части, но требует поддерживать брокер и согласованность окружения воркеров.
  • KubernetesExecutor снимает часть проблем с окружением, но требует дисциплины в контейнеризации, доступах и доставке DAG-ов.
  • Практический чеклист перед продакшен-деплоем Airflow

  • Metadata DB не SQLite
  • - для продакшена обычно используют PostgreSQL
  • Стратегия доставки DAG-ов определена
  • - общий volume, образ, синхронизация из Git
  • Секреты не в коде
  • - используйте Connections, secrets backend или секреты платформы
  • Логи доступны централизованно
  • - особенно важно для распределённого выполнения
  • Ограничения параллельности осознаны
  • - даже при большом количестве воркеров задачи могут не стартовать из-за лимитов Airflow
  • Есть план обновлений
  • - фиксируйте версии Airflow и провайдеров, обновляйте контролируемо

    Если вы используете Kubernetes, полезная отправная точка для стандартного деплоя:

  • Apache Airflow Helm Chart
  • Итоги

  • Executor определяет, как и где Airflow запускает ваши задачи.
  • CeleryExecutor распределяет выполнение через broker и воркеров и хорошо подходит для VM/on-prem.
  • KubernetesExecutor запускает каждую задачу в отдельном Pod и хорошо подходит для Kubernetes-платформ.
  • В обоих случаях ключ к надёжности остаётся тем же, что и в предыдущих статьях: идемпотентные задачи, корректные ретраи, безопасные Connections/Variables, наблюдаемость и понятная отладка.