본문 바로가기
Data Engineering/Airflow

Airflow에서 custom timetable로 자유롭게 스케쥴 설정하기

by 홍띠 2024. 7. 28.
💡 Airflow version 2.4 이상부터 schedule parameter가 제공된다.

 

Airflow 스케쥴 설정에 주로 사용하는 cron-based 스케쥴로는 한계가 존재하는데, 예를 들어 아래와 같은 상황이 있다.

  • 요일마다 다른 시간에 실행하도록 하기 (e.g. 월요일 4PM, 수요일 2PM)
  • daily 자정 스케쥴이지만 주말은 제외 - 크론표현식으로 스케쥴은 가능하지만 월요일 Dag run에서는 월요일 데이터가 아닌 토/일/월 데이터가 수집됨
  • 뷸규칙적인 간격으로 여러번 실행

위와 같은 특수한 케이스들을 다루기 위해서는 timetable을 이용하면 된다.


Timetables

Timetables 기능은 크론표현식과 timedelta 형태의 스케쥴의 한계를 극복하기위해 Airflow 2.2에 도입되었다.

모든 DAG 스케쥴은 내부적으로 timetable에 의해서 자동적으로 설정되는데, 필요시에는 사용자가 직접 커스터마이징 할 수 있다. Custom Timetables은 Airflow 플러그인으로 등록해서 사용해야 하고, Timetable의 하위 클래스여야 한다.

또한 아래의 메소드를 필수로 설정해서, DataInterval의 시작과 끝을 반환 할 수 있어야 한다.

  • next_dagrun_info: DAG의 규칙적인 스케쥴에 대한 데이터 기간 간격 반환
  • infer_manual_data_interval: DAG 수동 트리거링에서 데이터 기간 간격 반환

커스텀 Timetables의 datetime 반환값들은 모두 “aware” 값이여야 한다. “naive”이면 안되는데, timezone의 유무차이라고 보면 된다. 예를들어, timezone 포함해야 한다. 더해서, pendulum datetime과 timezone 타입을 사용해야한다.

 

Custom Timetable 작성

https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html
예제를 참고해서 작성하였으며, UTC를 KST로 변경하고 미국공휴일을 적용하는 부분을 제거했다. 평일 자정에 실행되는 Daily DAG 스케쥴로, 평일 데이터를 수집하기 위함 → 월요일에 주말 데이터가 포함되어서는 안되기 때문에 Custom Timetables 작성

 

위에서 설명한대로 Airflow의 Timetable을 상속받아서 subclass로 Timetable을 작성하고, 이를 Airflow Plugin으로 등록해서 사용해야 한다.

  • Custom Timetable Class 기본 포맷
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class WorkdayTimetable(Timetable):
		# 여기에 Data Interval을 반환하는 메서드들을 작성해서 원하는 결과를 반환하도록 구현한다.
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [WorkdayTimetable]

 

  • 파일 구조 - Airflow에 정의된 plugin 폴더 위치에 맞게 위치해야함
dags
└── test_dag.py
plugins
├── __init__.py
├── timetables
    ├── __init__.py
    └── workday_timetable.py
  • Airflow에서 기본적으로 플러그인은 lazy loaded 되도록 설정되어서, 한번 load되면 다시 불러오지 않는다. Airflow 프로세스가 시작할때마다 새로 load 하기 위해서는 airflow.cfg 에서 아래 설정을 False 로 변경해주어야 한다.
lazy_load_plugins = False
  • 사용자 정의 스케쥴링 로직 적용
import logging
from pendulum import Date, DateTime, Time, timezone, today

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable, TimeRestriction
log = logging.getLogger(__name__)

KST = timezone("Asia/Seoul")

class WorkdayTimetable(Timetable):
    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
        next_start = d
        while True:
            if next_start.weekday() not in (5, 6):  # not on weekend
                break
            next_start = next_start.add(days=incr)
        return next_start

    def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
        """
        :param run_after: Dag가 트리거된 시간 (Type: pendulum.Datetime). UTC 타임존
        :return: DataInterval start, end
        """
        # run_after의 전날 자정을 start로 잡으면서, 주말을 제외하도록 한번 더 처리 (이때 run_after는 KST로 변환해서 처리)
        start = DateTime.combine((run_after.in_timezone(KST).add(days=-1)).date(), Time.min).replace(tzinfo=KST)
        start = self.get_next_workday(start, incr=-1)
        return DataInterval(start=start, end=start.add(days=1))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        """
        :param last_automated_data_interval: DataInterval instance indicating the data interval of this DAG’s previous non-manually-triggered run
        :param restriction: restriction encapsulates how the DAG and its tasks specify the schedule, and contains three attributes(earliest, latest, catchup)
        :return: DagRunInfo(start, end) 혹은 None
        """
        # Regular Schedule로 실행된 previous run이 존재하는 경우
        if last_automated_data_interval is not None:
            last_start = last_automated_data_interval.start.in_timezone(KST)  # KST로 변환해서 last_start 받아옴
            next_start = DateTime.combine((last_start.add(days=1)).date(), Time.min)
        # Regular Schedule에서 first ever run 인 경우
        elif (earliest := restriction.earliest.in_timezone(KST)) is None:  # start_date가 지정 안되어 있는 경우
            return None  # start_date가 없으므로, 스케쥴하지 않음
        elif not restriction.catchup:  # catchup = False의 경우, today가 earliest가 될 수 있음
            next_start = max(earliest, DateTime.combine(today(KST), Time.min))
        elif earliest.time() != Time.min:  # start_date가 자정으로 되어있지 않은 경우, 다음날로 skip
            next_start = DateTime.combine(earliest.add(days=1), Time.min)
        else:
            next_start = earliest
        # 주말인 경우 평일로 skip
        next_start = self.get_next_workday(next_start.replace(tzinfo=KST))
        # latest가 next_start 보다 크면 스케쥴 중지
        if restriction.latest is not None and next_start > restriction.latest.in_timezone(KST):
            return None
        return DagRunInfo.interval(start=next_start, end=next_start.add(days=1))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [WorkdayTimetable]
  • Dag에서 CustomTimetable 호출해서 사용
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from workday_timetable import WorkdayTimetable

KST = pendulum.timezone("Asia/Seoul")

with DAG(
    dag_id='workday_timetable_test',
    start_date=pendulum.datetime(2024, 7, 1, tz=KST),
    schedule=WorkdayTimetable(),
    catchup=True
) as dag:
    '''
    variables 값 가져오기
    '''
    # BashOperator를 사용하여 Jinja 템플릿 변수를 출력
    print_jinja_variable = BashOperator(
        task_id='print_jinja_variable',
        bash_command='''echo "data_interval_start: {{ data_interval_start }}"
                        echo "data_interval_end: {{ data_interval_end }}"
                        echo "logical_date: {{ logical_date }}"
                    '''
    )
  • 결과

Custom TimeTable 제한사항

  • Timetable은 호출마다 동일한 결과값이 반환되어야 한다. 이 메서드는 event-based triggering을 위해서 구현되지 않았다.
  • Timetable은 DAG Run이 생성되면 파싱되므로, 너무 느리거나 긴 코드는 Airflow 성능에 영향을 끼칠수 있으므로 지양한다.