본문 바로가기
Data Engineering

Airflow Dag 작성 Best practice

by 홍띠 2023. 11. 5.

Airflow 공식홈페이지에서 안내하는 Dag 작성 Best practice를 기반으로 평소에 염두해 두고 코드를 작성하기 위해서 정리했다.

참고: https://airflow.apache.org/docs/apache-airflow/2.0.0/best-practices.html#writing-a-dag

🏅 Airflow Best Practice


DAG 작성

  • Task 사이의 데이터 전달은 작은 사이즈는 Xcom 을 사용하고, 대용량은 S3/HDFS 같은 외부 저장소를 사용한다.
  • Airflow에 등록한 Variable은 Operator의 execute()나 Jinja template 외에서의 사용은 지양한다.
  • 불필요한 top level code는 지양한다.
import pendulum

from airflow import DAG
from airflow.decorators import task

import numpy as np  # <-- THIS IS A VERY BAD IDEA! DON'T DO THAT!

with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_array():
				import numpy as np  # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE!
        """Print Numpy array."""
				a = np.arange(15).reshape(3, 5)
        print(a)
        return a

    print_array()

Task 생성

  • 멱등성이 보장되어야 한다.
    • 가능하면 UPSERT를 사용하고, INSERT 사용 시 멱등성을 보장하도록 유의
    • Latest로 데이터를 조회/입력 하지 않고, Read/Write partition을 정확히 할것
    • datetime.now()와 같은 현재시간 대신 execution_date 를 사용
  • 반복되는 parameter는 공통으로 정의해서 사용한다.
with DAG(
    dag_id='airflow_tutorial_2',
    default_args=default_args,
    schedule_interval=None,
    params={
        "param1": "value1",
        "param2": "value2"
    }
) as dag:
    bash = BashOperator(
        task_id='bash',
        bash_command='''
            echo {{ params.param1 }} # Output: value1
            echo {{ params.param2 }} # Output: value2
        '''
    )