티스토리 뷰
airflow 에서는 code 탭에서 dag 를 구성한 python 파일을 볼 수 있다. 그런데, 이 파일과 dag 값이 다르게 로딩되서 한참 삽질한 경험을 공유하고자 한다. 결론부터 말하면 dag 를 선언한 파일에서 다른 dag 의 파일을 import 하면서 영향을 받았다.
원인 확인방법
airflow 에서는 dag 파일을 읽고, dag bag 에 담아서 관리된다. 문제는 이때 관련된 정보가 잘못 인지되었던 문제였다.
dag 로딩이 잘되었는지는 airflow dags list 명령으로 확인이 가능하다.
원래 primitive.py, hour.py, day.py 3개의 파일이 따로 존재하고, dag 도 파일별로 따로 선언했는데 아래와 같이 primitive.py 경로가 아닌 hour.py 로 잘못잡히는걸 확인할 수 있었다. 로딩시점에 따라서 재현이 안될수도 있는데 파일이 늘어가다보면 로딩순서가 어긋나면 재현될 수 있으므로 미리 알아두도록 하자.
$ airflow dags list
Error: Failed to load all files. For details, run `airflow dags list-import-errors`
dag_id | filepath | owner | paused
========================+===============================+===========+=======
day_v1.0.0 | sample/day.py | sample | True
hour_v1.0.0 | sample/hour.py | sample | True
primivie_v1.0.0 | sample/hour.py | sample | True
$ airflow dags list-import-errors
filepath | error
===============================================+==================================================================================================================================================================================
/home/foo/airflow/dags/sample/primitive.py | AirflowDagDuplicatedIdException: Ignoring DAG mydag_primivie_v1.0.0 from /home/foo/airflow/dags/sample/primitive.py - also found in /home/foo/airflow/dags/sample/hour.py
문제와 해결
이런 문제가 일어났던 이유는 external task sensor 를 사용할때 external_task_id 가 변경될때 실수하는걸 피하기위해서,
DAG_ID 가 선언된 dag 파일을 import 했는데, 이게 문제였다.
import 하게되면 해당 파일을 읽게되고, 이때 해당 파일을 로딩하면서 그 안에서의 DAG 로직을 로딩하면서 dag bag 정보에 담을때 꼬이는 문제가 발생된것이었다.
즉, DAG 로직이 선언된 파일끼리는 절대 서로 해당 파일을 import 해서는 안된다.
이렇게 참조하게되면 import 된 파일을 읽으면서 파일애늬 DAG 가 중복로딩되어 불필요하게 로딩을 하고, 결과적으로는 filepath 정보가 잘못 인지되어 code 를 확인하려고 할때 엉뚱한 py 파일로 연결 되는 문제가 발생되었다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import pendulum
DAG_ID = "primivie_v1.0.0"
local_tz = pendulum.timezone("Asia/Seoul")
with DAG (
dag_id=DAG_ID,
default_args={'owner': 'sample'},
start_date=datetime(2023, 12, 11, 0, tzinfo=local_tz),
) as dag:
EmptyOperator(task_id='start')
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import pendulum
from sample.primitive import DAG_ID as PARENT_ID <-- 이게 문제임
DAG_ID = "hour_v1.0.0"
local_tz = pendulum.timezone("Asia/Seoul")
with DAG (
dag_id=DAG_ID,
default_args={'owner': 'sample'},
start_date=datetime(2023, 12, 11, 0, tzinfo=local_tz),
) as dag:
EmptyOperator(task_id='start')
extermal_task_id 를 참조하기위해서 dag_id 의 상수값만 따로 관리하던지... 다른 방법을 적용해야 한다는걸 결국 깨달았다.
마무리
사실 dag 를 선언한 파일의 root 영역에는 무언가 선언하지 않는것이 좋다고 가이드 되어있다.
https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html
DAG 의 표현을 위해 필요한 기본적인 참조를 제외하면 (보통 airflow 나 python 기본 라이브러리가 아닌)
최상단 import 가 아니라, DAG 블럭안쪽의 task 영역에 import 를 선언하는걸 추천하고 있었다.
위 문서는 꼭 읽어보도록 하자.
그리고 dag 를 if __name__ main 같은 조건을 써서 선언하는것도 생각할수 있는데 (import 시점의 로딩여부를 막을수 있겠다는 생각에...)
그렇게 하면 dag 자체가 로딩안될수 있나보다
https://stackoverflow.com/questions/62327923/dag-is-not-visible-on-airflow-ui
'데이터처리 > Airflow' 카테고리의 다른 글
[AIRFLOW] Variable.get() 의 성능 문제 해결방법 - 환경변수 활용법 (0) | 2023.12.15 |
---|---|
[오류] Airflow 에서 logical_date 가 잘못 생성되는 문제 - catchup 문제 (0) | 2023.12.14 |
[AIRFLOW] BashOperator 에서 시스템 환경변수가 로딩안되는 문제 - env 지정시 (0) | 2023.12.11 |
[AIRFLOW] logical_date 의 개념 이해하기- execution_date 의 대체 (1) | 2023.12.05 |
[AIRFLOW] ExternalTaskSensor 에서 스케쥴 단위가 다를때 의존성 거는법 (예: 10분->1시간) (0) | 2023.11.13 |