Разработка Airflow с помощью Docker
Airflow должен быть легким в работе и развитии.
Новый инструмент astro-cli от астрономов предназначен для помощи пользователям в работе с облаком и не охватывает все рабочие процессы разработки.
Это не очень хорошо работает с докером, потому что для докера недостаточно подготовки в докер-коммуникациях.
Даги, будут расположены в корне проекта в dags.
Даги, которые готовы к фиксации и пойдут в производство, хранятся в dags-production.
Вот пример структуры
/
dags/
include/
helpers/
company_name/
module_name/
dags-production/
team/
dags/
us-east-1/
include/
helpers/
*.dag
eu-central-1/
include/
helpers/
*.dag
Airflow каждую секунду пишет в свою домашнюю папку. Вот почему следует использовать файловую систему памяти.
Создание виртуальной среды
Выберем использование локальной папки airflow/, чтобы сохранить все библиотеки воздушного потока. У Airflow большое количество зависимостей.
В конце концов, Airflow — это обычный инструмент, и он должен оставаться отдельным от нашего кода.
Если у вас не установлена poetry, сделайте это сейчас.
curl -sSL <https://install.python-poetry.org> | python3 -
Затем подготовим папку airflow
mkdir airflow
echo "airflow" >> .gitignore
sudo mount -t tmpfs -o size=50m tmpfs ./airflow
poetry --directory ./airflow init --name=airflow --description=airflow --author=me --no-interaction
Затем подготовим папку airflow
Установка в виртуальной среде
Мы будем использовать poetry для инициализации новой виртуальной среды из папки ./airflow.
poetry --directory ./airflow shell
cd ..
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
Теперь давайте установим Airflow, используя файл ограничений
Мы будем следовать инструкциям отсюда https://airflow.apache.org/docs/apache-airflow/stable/start/local.html
AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-docker apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-redis
pip install apache-airflow-providers-mysql
Мы удалим все образцы DAG. Мы хотим увидеть наших DAG гораздо быстрее.
Airflow также будет работать быстрее, потому что он не будет анализировать большое количество DAG.
Мы будем искать что-то вроде
'/home/USER/.cache/pypoetry/virtualenvs/airflow-4vTX1qLp-py3.9'
poetry env info
echo "Show what we will delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags
echo "Do actual delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags | xargs rm -rf "{}"
echo "Or do it that way"
pip show pip
find /home/guda/.cache/pypoetry/virtualenvs/bookings-bQ2s_Hyz-py3.8/lib/python3.8/site-packages | grep example_dags
Если вы забыли удалить пробные даги, будет быстрее удалить базу данных airflow и начать заново.
rm airflow/airflow.db
Наконец, запустите его
airflow standalone
В следующий раз запустите с (пароль находится в файле standalone_admin_password.txt)
Запусr Airflow в следующий раз
Вы всегда можете узнать, какой пароль по умолчанию, запустив
cat airflowstandalone_admin_password.txt
Мы можем создать poetry оболочку в папке ./airflow
, а затем запустить airflow standalone или другие команды airflow.
Но быстрее префиксить команды airflow с помощью poetry --directory ./airflow run
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
poetry --directory ./airflow run airflow standalone
poetry --directory ./airflow run airflow ...
На этом этапе вы сможете получить доступ к пустому воздушному потоку через http://127.0.0.1:8080/
Подготовка к контейнерам
DockerOperator потребуется реестр для извлечения изображений.
В процессе разработки легко поместить образы сборки локально и попросить airflow извлечь их из реестра, настроенного при подключении docker_default.
Нет понимания, почему это называется docker_default вместо image_registry_default, но это уже другая тема.
Давайте запустим локальный реестр.
Работа с реестром
Извлеките и запустите реестр локально.
docker run -d -p 5000:5000 --name registry registry:2
В следующий раз у вас уже будет реестр, и вы должны запустить
docker start registry
Работа с изображениями
Когда вы создадите изображение, пометьте его тегом и нажмите на него.
docker-compose build custom_image
или
docker build .
Push & Pull
docker image tag custom_image localhost:5000/custom_image
docker push localhost:5000/custom_image
docker pull localhost:5000/custom_image
Hints & Tips для отладки локального реестра
Вам нужно указать пароль в airflow - проверьте командную строку подключения.
Пароль мог быть поддельным
docker login 127.0.0.1:5000
Когда вы указываете изображение, не забудьте установить хост примерно так:
127.0.0.1:5000/custom_image
Список изображений
curl -X GET http://127.0.0.1:5000/v2/_catalog
Остановить реестр
docker container stop registry \
&& docker container rm -v registry
Работа с контейнерами
При работе с контейнерами у вас, вероятно, будет скрипт точки входа. Точка входа предоставит вам интерфейс и позволит запускать только определенный набор команд. Чтобы переопределить точку входа изображения, сделайте это следующим образом:
docker run -u root --entrypoint /bin/bash -ti custom_image:latest
Работа с контейнерами
Начальное значение отлично подходит для размещения в задаче justfile airflow-seed
Заполнение пользователей
poetry --directory ./airflow run airflow users delete --username admin
poetry --directory ./airflow run airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin
Заполнение соединений
Вы используете direnv, верно?
poetry --directory ./airflow run \
airflow connections add 'docker_default' \
--conn-type 'docker' \
--conn-login 'root' \
--conn-host '127.0.0.1' \
--conn-port '5000'
poetry --directory ./airflow run \
airflow connections add 'snowflake' \
--conn-type 'generic' \
--conn-login '$SNOWFLAKE_USER' \
--conn-host '$SNOWFLAKE_ACCOUNT' \
--conn-password '$SNOWFLAKE_PASSWORD'
poetry --directory ./airflow run \
airflow connections add 'aws' \
--conn-type 'aws'
Заполнение переменных
poetry --directory ./airflow run \
airflow variables set aws_region_name us-east-1
poetry --directory ./airflow run \
airflow variables set current_aws_account NNNNNNNNNNNNNNNN
poetry --directory ./airflow run \
airflow variables set environment production
poetry --directory ./airflow run \
airflow variables set airflow_base_url http://localhost:8080/
Dag
Теперь, наконец, вы сможете сделать что-то подобное
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta
# Default arguments for the DAG
default_args = {
'owner': 'me',
'start_date': datetime(2022, 1, 1),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create the DAG
with DAG(
'hello_world_dag',
default_args=default_args,
schedule_interval=timedelta(hours=1),
catchup=False,
) as dag:
# Create a task using the DockerOperator
hello_world_task = DockerOperator(
task_id='hello_world_task',
image='localhost:5000/myimage:latest',
api_version='auto',
command='echo "hello world"',
docker_conn_id='local_docker_registry',
)