본문 바로가기
Data Engineering/Airflow

Airflow Rest API를 사용하여 DAG 호출하기

by 홍띠 2024. 1. 13.

API 인증 설정

username, password를 사용해서 인증을 할 수 있도록 airflow.cfg파일에서 아래와 같이 설정한다.

기본 설정값은 session 인증이다.

[api]
auth_backends = airflow.api.auth.backend.basic_auth

현재 설정된 값을 확인하고 싶으면 아래 명령어를 이용한다.

$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth

DAG 파일 작성

호출 할 DAG파일을 작성한다.

아래의 예제는 DAG를 호출 할 때 configuration값을 넘겨서 호출 된 DAG에서 해당 값을 사용할 수 있도록 구성되어 있다.

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    dag_id="example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2024, 1, 12, tz="UTC")
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

parameterized_task
  • 해당 DAG는 일정한 시간간격이 아닌 호출에 의해서 실행되므로 schedule=None 으로 설정한다.
  • Jinja 템플릿을 이용해서 DAG호출시에 설정한 파라미터 값을 불러와서 사용 할 수 있다.

API로 Dag를 호출하는 코드 작성

DAG를 호출은 동일한 Airflow의 다른 Dag이던, Airflow 외부의 어플리케이션이던 상관없이 가능하다. 다만, 각 환경에 맞게 접근이 가능하도록 (Airflow webserver ↔ 호출 주체)의 IP와 Port의 보안설정이 되어 있어야 한다.

  • 호출 방법
curl -X POST \
  http://<webserver IP>:<webserver port>/api/v1/dags/<DAG_ID>/dagRuns \
  -H 'Content-Type: application/json' \
  --user "username:password" \
  -d '{"conf":"{\"key\":\"value\"}"}'

아래의 예제코드는 Kubernetes Executor를 사용하여 쿠버네티스 클러스터 위에서 동작하는 Airflow의 Dag 내부에서 다른 Dag를 호출하기 위하여 사용한 Task의 파이썬 코드 일부이다.

'''(생략)'''

# Dag 호출을 위한 설정
dag_run_url = "http://<service name>.<namespace>.svc.cluster.local:8080/api/v1/dags/example_parameterized_dag/dagRuns"
headers = {
    "Content-Type": "application/json",
}
# DAG 실행 요청
conf1 = "test"
username = "test_user"
password = "testpw1234"
dag_run_data = {
    "conf": {
        "conf1": f"{conf1}"
    }
}
response = requests.post(dag_run_url, headers=headers, json=dag_run_data, auth=(username, password))
response.raise_for_status()  # HTTP 오류가 발생하면 예외 발생
# 응답 확인
print(f"DAG Run Response: {response.json()}")

'''(생략)'''

 

 

참고:

https://airflow.apache.org/docs/apache-airflow/stable/security/api.html

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html

https://brilliantprogrammer.medium.com/how-to-trigger-airflow-dag-using-rest-api-dd40e3f7a30d