티스토리 뷰

반응형

airflow 에서 DAG 의 의존성을 거는 방법은 task 의 set_upstream / set_downstream 메소드를 사용하는 방법이 있다.

하지만, 실제는 연산자 오버로딩이된 >> 혹은 << 을 표현하여 쓰는것이 더 자주 쓰인다. (더 간결해서)

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html

 

그런데, 다음과 같은 의존성을 만들때, task a, b, c 를 표현하기위해 변수에 대입하여 거는 실수를 하면 안된다.

의도한 DAG 의존성

 

변수 대입을 잘못써서 의존성을 건 케이스

위와 같은 DAG 를 그릴때 아래와 같이 변수에 대입하여 잘못 의존성을 거는 실수를 할 수 있다.

아래와 같이 대입하여 DAG 를 그리면 어떻게 나올까?

mport datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dag_test",
    start_date=datetime.datetime(2025, 1, 1, tzinfo=pendulum.timezone("Asia/Seoul"))
):
    start = EmptyOperator(task_id="start")
    task_a_1 = EmptyOperator(task_id="task_a_1")
    task_a_2 = EmptyOperator(task_id="task_a_2")
    task_a_3 = EmptyOperator(task_id="task_a_3")
    task_b_1 = EmptyOperator(task_id="task_b_1")
    task_b_2 = EmptyOperator(task_id="task_b_2")
    task_b_3 = EmptyOperator(task_id="task_b_3")
    task_c_1 = EmptyOperator(task_id="task_c_1")
    task_c_2 = EmptyOperator(task_id="task_c_2")
    task_c_3 = EmptyOperator(task_id="task_c_3")    
    end = EmptyOperator(task_id="end")

    # 잘못된 예시
    task_a = task_a_1 >> task_a_2 >> task_a_3
    task_b = task_b_1 >> task_b_2 >> task_b_3
    task_c = task_c_1 >> task_c_2 >> task_c_3
    start >> [task_a, task_b, task_c] >> end

 

의도한것과 다르게 start 가 task_?_1 앞이 아닌 task_?_3 에 붙는다. 왜 그럴까?

결론부터 말하면 의존성을 구성할때 변수대입을 쓰면 안된다. 오해를 없애고 깔끔하게 정리하려면 task_group 으로 묶는게 제일 좋고, 그게 아니라면 왜 그런지 이유를 알아야 한다.

변수 대입해서 하면 엉뚱한 그래프가 그려진다

왜 그럴까?

사실 task 객체의 set_upstream / set_downstream  를 연산자 오버로딩을 통해 사용하기 편하게 랩핑한것뿐이다.

그래서 사실 변수 대입이 필요없이 아래와 같이 표현하면 의도한 의존성을 걸 수 있다.

task_a_1 >> task_a_2 >> task_a_3 
task_b_1 >> task_b_2 >> task_b_3
task_c_1 >> task_c_2 >> task_c_3

start >> [task_a_1, task_b_1, task_c_1]
[task_a_3, task_b_3, task_c_3] >> end

 

task 의 의존성을 걸때 아래와 같은 코드는 최종적으로 task_a 에 task_a_3 가 대입된다.

# task_a 에는 task_a_3 객체가 대입된다
task_a = task_a_1 >> task_a_2 >> task_a_3


"""
>> 라는 연산자는 다음과 같은 형태로 함수가 호출된다고 보면 편하다

def downstream_operator(task1, task2):
    task1.set_downstream(task2)
    return task2

task_a = downstream_operator( downstream_operator(task_a_1, task_a_2), task_a_3)
"""

 

더 좋은건 task_group 을 사용하기

사실 오해도 줄이고, 관리도 편한건 TaskGroup 을 사용하는것이다.

연관된 Task 를 Group 으로 묶으면 그룹을 clear 하여 특정 그룹만 재처리하기도 용이하기 때문이다.

TaskGroup 을 쓴 표현

import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

with DAG(
    dag_id="dag_test",
    start_date=datetime.datetime(2025, 1, 1, tzinfo=pendulum.timezone("Asia/Seoul")),
):
    start = EmptyOperator(task_id="start")

    with TaskGroup(group_id="task_a") as task_a:
        task_a_1 = EmptyOperator(task_id="task_a_1")
        task_a_2 = EmptyOperator(task_id="task_a_2")
        task_a_3 = EmptyOperator(task_id="task_a_3")
        task_a_1 >> task_a_2 >> task_a_3 

    with TaskGroup(group_id="task_b") as task_b:
        task_b_1 = EmptyOperator(task_id="task_b_1")
        task_b_2 = EmptyOperator(task_id="task_b_2")
        task_b_3 = EmptyOperator(task_id="task_b_3")
        task_b_1 >> task_b_2 >> task_b_3 

    with TaskGroup(group_id="task_c") as task_c:
        task_c_1 = EmptyOperator(task_id="task_c_1")
        task_c_2 = EmptyOperator(task_id="task_c_2")
        task_c_3 = EmptyOperator(task_id="task_c_3")
        task_c_1 >> task_c_2 >> task_c_3 

    end = EmptyOperator(task_id="end")

    start >> [task_a, task_b, task_c] >> end

 

그래서 개인적으로는 TaskGroup 을 써서 구성하는것을 가장 추천한다

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함