본문 바로가기
카테고리 없음

Airflow - XCom으로 다음 Task에 데이터 넘기기

by 홍띠 2024. 6. 2.

Airflow XCom이란??

Airflow에서 XCom은 Cross-Communication의 약자로 Task간의 데이터를 교환하는 매커니즘이다. XCom을 사용하면 하나의 Task에서 생성된 데이터를 다른 Task에서 사용할 수 있어 Task간의 데이터교환이 가능해진다.

Task에서 XCom에 데이터 주입

  • PythonOperator의 return으로 xcom에 값 주입
    PythonOperator를 사용하면 자동으로 xcom에 return값이 주입된다.
def get_restaurant_ids():
    restaurant_ids = ["1212121212","12121213242"]
    return restaurant_ids
  • push를 이용한 데이터 주입
def restaurant_ids_push(**context):
    restaurant_ids = ["1212121212","12132342421312"]
    context['task_instance'].xcom_push(key='restaurant_ids', value=restaurant_ids)
    return "xcom_restaurant_ids"

 

XCom 데이터 추출

  • PythonOperator 내부에서 추출
def xcom_pull_test(**context):
    xcom_return = context["task_instance"].xcom_pull(task_ids='get_restaurant_ids')
    xcom_push_value = context['ti'].xcom_pull(key='restaurant_ids')

    print("xcom_return : {}".format(xcom_return))
    print("xcom_push_value : {}".format(xcom_push_value))
  • Dag파일에서 Jinja 템플릿 사용해서 추출
test_task = PythonOperator(
    task_id="test_task",
    python_callable=test_function,
    op_kwargs={
        "restaurant_id_by_task": '{{ task_instance.xcom_pull(task_ids="get_restaurant_ids") }}',
        "restaurant_id_by_key": '{{ task_instance.xcom_pull(key="restaurant_ids") }}'
    },
)