DevGang
Авторизоваться

Разработка Airflow с помощью Docker

Airflow должен быть легким в работе и развитии.

Новый инструмент astro-cli от астрономов предназначен для помощи пользователям в работе с облаком и не охватывает все рабочие процессы разработки.

Это не очень хорошо работает с докером, потому что для докера недостаточно подготовки в докер-коммуникациях.

Даги, будут расположены в корне проекта в dags.

Даги, которые готовы к фиксации и пойдут в производство, хранятся в dags-production.

Вот пример структуры

/
  dags/
    include/
      helpers/
        company_name/
          module_name/

  dags-production/
    team/
      dags/
        us-east-1/
          include/
            helpers/
          *.dag
        eu-central-1/
            include/
              helpers/
            *.dag

Airflow каждую секунду пишет в свою домашнюю папку. Вот почему следует использовать файловую систему памяти.

Создание виртуальной среды

Выберем использование локальной папки airflow/, чтобы сохранить все библиотеки воздушного потока. У Airflow большое количество зависимостей.

В конце концов, Airflow — это обычный инструмент, и он должен оставаться отдельным от нашего кода.

Если у вас не установлена poetry, сделайте это сейчас.

curl -sSL <https://install.python-poetry.org> | python3 -

Затем подготовим папку airflow

mkdir airflow
echo "airflow" >> .gitignore
sudo mount -t tmpfs -o size=50m tmpfs ./airflow
poetry --directory ./airflow init --name=airflow --description=airflow --author=me --no-interaction

Затем подготовим папку airflow

Установка в виртуальной среде

Мы будем использовать poetry для инициализации новой виртуальной среды из папки ./airflow.

poetry --directory ./airflow shell
cd ..
export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags

Теперь давайте установим Airflow, используя файл ограничений

Мы будем следовать инструкциям отсюда https://airflow.apache.org/docs/apache-airflow/stable/start/local.html 

AIRFLOW_VERSION=2.5.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

pip install apache-airflow-providers-docker apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-redis
pip install apache-airflow-providers-mysql

Мы удалим все образцы DAG. Мы хотим увидеть наших DAG гораздо быстрее.

Airflow также будет работать быстрее, потому что он не будет анализировать большое количество DAG.

Мы будем искать что-то вроде 

'/home/USER/.cache/pypoetry/virtualenvs/airflow-4vTX1qLp-py3.9'

poetry env info

echo "Show what we will delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags

echo "Do actual delete"
find /home/guda/.cache/pypoetry/virtualenvs | grep example_dags | xargs rm -rf  "{}"

echo "Or do it that way"
pip show pip
find /home/guda/.cache/pypoetry/virtualenvs/bookings-bQ2s_Hyz-py3.8/lib/python3.8/site-packages | grep example_dags

Если вы забыли удалить пробные даги, будет быстрее удалить базу данных airflow и начать заново.

rm airflow/airflow.db

Наконец, запустите его

airflow standalone

В следующий раз запустите с (пароль находится в файле standalone_admin_password.txt)

Запусr Airflow в следующий раз

Вы всегда можете узнать, какой пароль по умолчанию, запустив

cat airflowstandalone_admin_password.txt

Мы можем создать poetry оболочку в папке ./airflow, а затем запустить airflow standalone или другие команды airflow.

Но быстрее префиксить команды airflow с помощью poetry --directory ./airflow run

export PROJECT_HOME=$(realpath ./)
export AIRFLOW_HOME=${PROJECT_HOME}/airflow
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags

poetry --directory ./airflow run airflow standalone
poetry --directory ./airflow run airflow ...

На этом этапе вы сможете получить доступ к пустому воздушному потоку через http://127.0.0.1:8080/

Подготовка к контейнерам

DockerOperator потребуется реестр для извлечения изображений.

В процессе разработки легко поместить образы сборки локально и попросить airflow извлечь их из реестра, настроенного при подключении docker_default.

Нет понимания, почему это называется docker_default вместо image_registry_default, но это уже другая тема.

Давайте запустим локальный реестр.

Работа с реестром

Извлеките и запустите реестр локально.

docker run -d -p 5000:5000 --name registry registry:2

В следующий раз у вас уже будет реестр, и вы должны запустить

docker start registry

Работа с изображениями

Когда вы создадите изображение, пометьте его тегом и нажмите на него.

docker-compose build custom_image

или

docker build .

Push & Pull

docker image tag custom_image localhost:5000/custom_image
docker push localhost:5000/custom_image
docker pull localhost:5000/custom_image

Hints & Tips для отладки локального реестра

Вам нужно указать пароль в airflow - проверьте командную строку подключения.

Пароль мог быть поддельным

docker login 127.0.0.1:5000

Когда вы указываете изображение, не забудьте установить хост примерно так:

    127.0.0.1:5000/custom_image

Список изображений

curl -X GET http://127.0.0.1:5000/v2/_catalog

Остановить реестр

docker container stop registry \
  && docker container rm -v registry

Работа с контейнерами

При работе с контейнерами у вас, вероятно, будет скрипт точки входа. Точка входа предоставит вам интерфейс и позволит запускать только определенный набор команд. Чтобы переопределить точку входа изображения, сделайте это следующим образом:

docker run -u root --entrypoint /bin/bash -ti custom_image:latest

Работа с контейнерами

Начальное значение отлично подходит для размещения в задаче justfile airflow-seed

Заполнение пользователей

poetry --directory ./airflow run airflow users delete --username admin
poetry --directory ./airflow run airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin

Заполнение соединений

Вы используете direnv, верно?

poetry --directory ./airflow run \
  airflow connections add 'docker_default' \
    --conn-type 'docker' \
    --conn-login 'root' \
    --conn-host '127.0.0.1' \
    --conn-port '5000'

poetry --directory ./airflow run \
  airflow connections add 'snowflake' \
    --conn-type 'generic' \
    --conn-login '$SNOWFLAKE_USER' \
    --conn-host '$SNOWFLAKE_ACCOUNT' \
    --conn-password '$SNOWFLAKE_PASSWORD'

poetry --directory ./airflow run \
  airflow connections add 'aws' \
    --conn-type 'aws'

Заполнение переменных

poetry --directory ./airflow run \
  airflow variables set aws_region_name us-east-1

poetry --directory ./airflow run \
  airflow variables set current_aws_account NNNNNNNNNNNNNNNN

poetry --directory ./airflow run \
  airflow variables set environment production

poetry --directory ./airflow run \
  airflow variables set airflow_base_url http://localhost:8080/

Dag

Теперь, наконец, вы сможете сделать что-то подобное

from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from datetime import datetime, timedelta

# Default arguments for the DAG
default_args = {
    'owner': 'me',
    'start_date': datetime(2022, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create the DAG
with DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=timedelta(hours=1),
    catchup=False,
) as dag:
    # Create a task using the DockerOperator
    hello_world_task = DockerOperator(
        task_id='hello_world_task',
        image='localhost:5000/myimage:latest',
        api_version='auto',
        command='echo "hello world"',
        docker_conn_id='local_docker_registry',
    )
#Python #Docker
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу