티스토리 뷰

반응형

airflow 에서 logical_date 와 schedule 을 고려하면 월배치를 돌리려면 말일 기준의 logical_date 기준으로 배치가 돌아야 한다. 하지만 다 알고 있는것처럼 월말은 1월은 31일이요, 2월은 28 혹은 29일, 3월은 31일 등등 이런식이라 처리하기가 어렵다.

제대로 처리하려면 Airflow 2.4.x 이상버전에서는 Timetables 을 직접 구현하는게 가장 베스트인걸로 보인다.

하지만, 여기서는 AirflowSkipException 과 schedule 의 crontab 표현을 이용해 해결하는 방법을 알려주고자 한다.

 

쉽게 생각해서 crontab 에서 매월 28,29,30,31 에 스케쥴을 활성화 하고,

월말을 계산해서 아닌날은 스케쥴을 넘기는 형태로 해결하는 방법이다.

 

해결방법

바로 샘플코드로 넘어가겠다. 위에 설명한것처럼 if_check 태스크는 logical_date 에서 타임존세팅을 고려한 값으로 변환하고 이를 월말의 마지막날 값을 계산하고 이값이 같은지를 비교하는 로직을 넣었다.

 

28,29,30,31 모두 스케쥴이 활성화 되지만 마지막날이 아니라면 AirflowSkipException 을 유도해서 그 뒤에 있는 태스크는 실행안해서 실제 마지막날에만 쿼리를 실행하는 형태로 해결한 코드이다.

 

약간 불필요한 스케쥴이 도는 단점이 있긴하지만, DAG 에 코드를 추가해서 해결할수 있는 가장 심플한 방법이 아닌가 싶다.

import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowSkipException


with DAG(
    dag_id="monthly_last_day",
    start_date=datetime.datetime(2024, 12, 1, tzinfo=pendulum.timezone("Asia/Seoul")),
    schedule=CronTriggerTimetable("0 0 28-31 * *", timezone="Asia/Seoul"),
    max_active_runs=1,  # 동시에 하나의 DAG만 실행
    access_control={"ad-bi": ["can_read", "can_edit"]},
):
    start = EmptyOperator(task_id="start")


    def check_condition(logical_date, **kwargs):
        from airflow.settings import TIMEZONE
        import calendar
        basetime = logical_date.astimezone(TIMEZONE) if logical_date else None
        res = calendar.monthrange(basetime.year, basetime.month)
        day = res[1]
        last_date_of_month = ("%04d-%02d-%02d" % (basetime.year, basetime.month, day))
        if last_date_of_month != basetime.strftime('%Y-%m-%d'):
            raise AirflowSkipException("[SKIP] is not Last Month Day")
        else:
            print(f"{basetime} is Last Month Day")

    if_check = PythonOperator(task_id='check_monthly_last_day', python_callable=check_condition, provide_context=True)

    table = BashOperator(
        task_id="table",
        bash_command="echo 'select * from t_sample where ymd={{ logical_date | ds }}'",
    )

    end = EmptyOperator(task_id="end")

    start >> if_check >> table >> end

 

나중에는 월단위 배치를 위한 스케쥴을 처리하기위해서 코드를 만들어 봐야겠다. 

뭔가 airflow 는 유명한것 대비 뭔가 쓰다보면 아쉬운게 있는것 같다.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/02   »
1
2 3 4 5 6 7 8
9 10 11 12 13 14 15
16 17 18 19 20 21 22
23 24 25 26 27 28
글 보관함