Airflow 공식홈페이지에서 안내하는 Dag 작성 Best practice를 기반으로 평소에 염두해 두고 코드를 작성하기 위해서 정리했다.
참고: https://airflow.apache.org/docs/apache-airflow/2.0.0/best-practices.html#writing-a-dag
🏅 Airflow Best Practice
DAG 작성
- Task 사이의 데이터 전달은 작은 사이즈는 Xcom 을 사용하고, 대용량은 S3/HDFS 같은 외부 저장소를 사용한다.
- Airflow에 등록한 Variable은 Operator의 execute()나 Jinja template 외에서의 사용은 지양한다.
- 불필요한 top level code는 지양한다.
import pendulum
from airflow import DAG
from airflow.decorators import task
import numpy as np # <-- THIS IS A VERY BAD IDEA! DON'T DO THAT!
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_array():
import numpy as np # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE!
"""Print Numpy array."""
a = np.arange(15).reshape(3, 5)
print(a)
return a
print_array()
Task 생성
- 멱등성이 보장되어야 한다.
- 가능하면 UPSERT를 사용하고, INSERT 사용 시 멱등성을 보장하도록 유의
- Latest로 데이터를 조회/입력 하지 않고, Read/Write partition을 정확히 할것
- datetime.now()와 같은 현재시간 대신 execution_date 를 사용
- 반복되는 parameter는 공통으로 정의해서 사용한다.
with DAG(
dag_id='airflow_tutorial_2',
default_args=default_args,
schedule_interval=None,
params={
"param1": "value1",
"param2": "value2"
}
) as dag:
bash = BashOperator(
task_id='bash',
bash_command='''
echo {{ params.param1 }} # Output: value1
echo {{ params.param2 }} # Output: value2
'''
)
'Data Engineering' 카테고리의 다른 글
DuckDB를 사용해서 Iceberg 테이블에 쿼리 실행하기 (0) | 2024.09.15 |
---|---|
견고한 데이터 엔지니어링 - 데이터 수집 (0) | 2024.08.18 |
견고한 데이터 엔지니어링 - 원천 시스템에서의 데이터 생성 (0) | 2024.05.26 |
견고한 데이터 엔지니어링 - 우수한 데이터 아키텍처의 원칙 (0) | 2024.05.12 |
견고한 데이터 엔지니어링 - 데이터 엔지니어링이란? (0) | 2024.04.20 |