티스토리 뷰

반응형

ETL 작업을 구성하다보면, 배치주기가 다양하게 구성하는 경우가 많다.

특히, raw 로그를 처리하는건 10분 단위와 같이 짧게 유지하고, 뒤쪽 데이터 가공하는건 1시간이나 1일 단위로 배치 사이클을 더 넓게 가져가야하는 경우가 종종 생긴다. 만약, NamedHivePartitionSensor 를 사용하는 경우는 N개의 파티션 이름을 나열해서 쉽게 해결이 가능하다. 그렇다면 ExternalTaskSensor 에서는 이런 문제를 어떻게 해결할 수 있을까?

check_event_log = NamedHivePartitionSensor(
    task_id='check_event_log',
    partition_names=[
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}00",
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}10",
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}20",
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}30",
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}40",
        "log.event_log/ymd={{ ymd(dag_run.logical_date) }}/hh24mi={{ hh24(dag_run.logical_date) }}50"
    ],
    timeout=2 * 60 * 60, # 2 시간 대기
    poke_interval=5 * 60, # 5분 마다 체크
    metastore_conn_id=METASTORE_CONN_ID
)

 

참고로, NamedHivePartitionSensor 에서 ExternalTaskSensor 로 바꾸려고 한 이유는 airflow 의 UI 에서 DAG 간 의존성을 확인할 수 있기 때문에 바꿔보려고 했던것이고, 스케쥴 간격이 다른 의존성문제를 어떻게 풀었는지 알리고자 간단히 정리했다.

 

해결방법

우선 airflow 2.x 버전대를 기준으로 설명하겠으며, ExternalTaskSensor 의 인자값을 확인해보면 execution_delta 라는 인자값이 있는데, 여기서 스케쥴을 역산해서 동일한 값으로 만들도록 유도하면 된다. 그림으로 풀어내면 아래와 같다.

 

dag_1H_sample 에서 아래와 같이 의존성을 걸면 위와 같은 디펜던시 구조로 사이클 연결이 가능하다.

check_event_log = []
for delta_minute in (0, 10, 20, 30, 40, 50):
    wait = ExternalTaskSensor(
        task_id=f'check_event_log_{delta_minute:02}',
        external_dag_id="dag_10M_sample",
        execution_delta=timedelta(minutes=0-delta_minute)
    )
    check_event_log.append(wait)

wait = DummyOperator(task_id='wait')

...
check_conversion_log >> wait >> [ ... ]
반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함