Airflow XCom이란??
Airflow에서 XCom은 Cross-Communication의 약자로 Task간의 데이터를 교환하는 매커니즘이다. XCom을 사용하면 하나의 Task에서 생성된 데이터를 다른 Task에서 사용할 수 있어 Task간의 데이터교환이 가능해진다.
Task에서 XCom에 데이터 주입
- PythonOperator의 return으로 xcom에 값 주입
PythonOperator를 사용하면 자동으로 xcom에 return값이 주입된다.
def get_restaurant_ids():
restaurant_ids = ["1212121212","12121213242"]
return restaurant_ids
- push를 이용한 데이터 주입
def restaurant_ids_push(**context):
restaurant_ids = ["1212121212","12132342421312"]
context['task_instance'].xcom_push(key='restaurant_ids', value=restaurant_ids)
return "xcom_restaurant_ids"
XCom 데이터 추출
- PythonOperator 내부에서 추출
def xcom_pull_test(**context):
xcom_return = context["task_instance"].xcom_pull(task_ids='get_restaurant_ids')
xcom_push_value = context['ti'].xcom_pull(key='restaurant_ids')
print("xcom_return : {}".format(xcom_return))
print("xcom_push_value : {}".format(xcom_push_value))
- Dag파일에서 Jinja 템플릿 사용해서 추출
test_task = PythonOperator(
task_id="test_task",
python_callable=test_function,
op_kwargs={
"restaurant_id_by_task": '{{ task_instance.xcom_pull(task_ids="get_restaurant_ids") }}',
"restaurant_id_by_key": '{{ task_instance.xcom_pull(key="restaurant_ids") }}'
},
)