티스토리 뷰
Airflow 의존성 사용 방법 알아보기 : set_upstream / set_downstream / task_group
정선생 2025. 3. 31. 19:00airflow 에서 DAG 의 의존성을 거는 방법은 task 의 set_upstream / set_downstream 메소드를 사용하는 방법이 있다.
하지만, 실제는 연산자 오버로딩이된 >> 혹은 << 을 표현하여 쓰는것이 더 자주 쓰인다. (더 간결해서)
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
그런데, 다음과 같은 의존성을 만들때, task a, b, c 를 표현하기위해 변수에 대입하여 거는 실수를 하면 안된다.

변수 대입을 잘못써서 의존성을 건 케이스
위와 같은 DAG 를 그릴때 아래와 같이 변수에 대입하여 잘못 의존성을 거는 실수를 할 수 있다.
아래와 같이 대입하여 DAG 를 그리면 어떻게 나올까?
mport datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dag_test",
start_date=datetime.datetime(2025, 1, 1, tzinfo=pendulum.timezone("Asia/Seoul"))
):
start = EmptyOperator(task_id="start")
task_a_1 = EmptyOperator(task_id="task_a_1")
task_a_2 = EmptyOperator(task_id="task_a_2")
task_a_3 = EmptyOperator(task_id="task_a_3")
task_b_1 = EmptyOperator(task_id="task_b_1")
task_b_2 = EmptyOperator(task_id="task_b_2")
task_b_3 = EmptyOperator(task_id="task_b_3")
task_c_1 = EmptyOperator(task_id="task_c_1")
task_c_2 = EmptyOperator(task_id="task_c_2")
task_c_3 = EmptyOperator(task_id="task_c_3")
end = EmptyOperator(task_id="end")
# 잘못된 예시
task_a = task_a_1 >> task_a_2 >> task_a_3
task_b = task_b_1 >> task_b_2 >> task_b_3
task_c = task_c_1 >> task_c_2 >> task_c_3
start >> [task_a, task_b, task_c] >> end
의도한것과 다르게 start 가 task_?_1 앞이 아닌 task_?_3 에 붙는다. 왜 그럴까?
결론부터 말하면 의존성을 구성할때 변수대입을 쓰면 안된다. 오해를 없애고 깔끔하게 정리하려면 task_group 으로 묶는게 제일 좋고, 그게 아니라면 왜 그런지 이유를 알아야 한다.

왜 그럴까?
사실 task 객체의 set_upstream / set_downstream 를 연산자 오버로딩을 통해 사용하기 편하게 랩핑한것뿐이다.
그래서 사실 변수 대입이 필요없이 아래와 같이 표현하면 의도한 의존성을 걸 수 있다.
task_a_1 >> task_a_2 >> task_a_3
task_b_1 >> task_b_2 >> task_b_3
task_c_1 >> task_c_2 >> task_c_3
start >> [task_a_1, task_b_1, task_c_1]
[task_a_3, task_b_3, task_c_3] >> end
task 의 의존성을 걸때 아래와 같은 코드는 최종적으로 task_a 에 task_a_3 가 대입된다.
# task_a 에는 task_a_3 객체가 대입된다
task_a = task_a_1 >> task_a_2 >> task_a_3
"""
>> 라는 연산자는 다음과 같은 형태로 함수가 호출된다고 보면 편하다
def downstream_operator(task1, task2):
task1.set_downstream(task2)
return task2
task_a = downstream_operator( downstream_operator(task_a_1, task_a_2), task_a_3)
"""
더 좋은건 task_group 을 사용하기
사실 오해도 줄이고, 관리도 편한건 TaskGroup 을 사용하는것이다.
연관된 Task 를 Group 으로 묶으면 그룹을 clear 하여 특정 그룹만 재처리하기도 용이하기 때문이다.

import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
with DAG(
dag_id="dag_test",
start_date=datetime.datetime(2025, 1, 1, tzinfo=pendulum.timezone("Asia/Seoul")),
):
start = EmptyOperator(task_id="start")
with TaskGroup(group_id="task_a") as task_a:
task_a_1 = EmptyOperator(task_id="task_a_1")
task_a_2 = EmptyOperator(task_id="task_a_2")
task_a_3 = EmptyOperator(task_id="task_a_3")
task_a_1 >> task_a_2 >> task_a_3
with TaskGroup(group_id="task_b") as task_b:
task_b_1 = EmptyOperator(task_id="task_b_1")
task_b_2 = EmptyOperator(task_id="task_b_2")
task_b_3 = EmptyOperator(task_id="task_b_3")
task_b_1 >> task_b_2 >> task_b_3
with TaskGroup(group_id="task_c") as task_c:
task_c_1 = EmptyOperator(task_id="task_c_1")
task_c_2 = EmptyOperator(task_id="task_c_2")
task_c_3 = EmptyOperator(task_id="task_c_3")
task_c_1 >> task_c_2 >> task_c_3
end = EmptyOperator(task_id="end")
start >> [task_a, task_b, task_c] >> end
그래서 개인적으로는 TaskGroup 을 써서 구성하는것을 가장 추천한다
'데이터처리 > Airflow' 카테고리의 다른 글
[Airflow] 월말에 실행되는 배치 사이클 구성하기 - 28,29,30,31일 문제 (0) | 2025.02.06 |
---|---|
[Airflow] logical_date 한국시간으로 맞춰서 실행하기 (0) | 2025.02.05 |
[Airflow] logical_date 를 타임존에 맞게 변환해서 쓰기 : custom filter 추가 (0) | 2025.02.04 |
[Airflow] crontab 표현시, 일부 날짜가 스케쥴이 안되는 문제 - 타임존영향 (0) | 2025.02.03 |
Airflow 2.x 의 주요 퍼미션별 권한 6개 상세 설명 : 캡쳐포함 (1) | 2024.08.28 |