티스토리 뷰
airflow 의 best practices 글을 읽어보면, Variable.get(키) 형태로 직접 값을 가져오지 않고, 바인딩처리해서 {{var.value.키}} 표현해서 사용하는것을 기본 가이드로 하고 있다. 그 이유는 일단 DAG 를 구성할때 top level code 에 관련된 로직이 존재한다면 DAG 실행뿐 아니라, 로딩되는 시점에서도 그 코드가 동작되기 때문에 성능에 문제가 될 수 있다.
https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#airflow-variables
from airflow.models import Variable
# Bad example
foo_var = Variable.get("foo") # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)
# Good example
bash_use_variable_good = BashOperator(
task_id="bash_use_variable_good",
bash_command="echo variable foo=${foo_env}",
env={"foo_env": "{{ var.value.get('foo') }}"},
)
문제점 : 이메일주소는 Variable 어떻게 하지?
안타깝게도, jinja template 로 변환이 안되는 경우가 꽤 많다. Operator 내에서는 "var.value.키" 형태의 바인딩이 지원되는경우가 있어서 어찌어찌 구성이 가능한데.. DAG 자체의 인자값들은 이런 바인딩을 할 수가 없어서 어떻게 최적화를 해야하지??? 아무리 고민해도 답이 나오지 않았다. (보통 default_args 에 담아지는 값들)
동일한 DAG 로직 하나로 관리하고, 환경별 (dev/test/stage/prod) 다른 케이스는 Variable 을 통해 분기하는 형태로 구현했었다.
이건 operator 에서 선언하는게 아니라 DAG 를 선언할때 사용하는 값들이다보니 가이드되는 방식으로는 표현이 되지 않았다.
- 개발/운영환경의 데이터 유입 시작이 달라서 start_date 를 분기하거나
- 개발/운영환경별 장애 전파를 슬랙채널의 이메일 보내기를 통해 연동하는데 채널이 달라 메일주소가 다르거나
profile = Variable.get('profile')
start_date = datetime(2023, 12, 1, 0, tzinfo=local_tz) if profile == 'prod' else datetime(2023, 11, 1, 0, tzinfo=local_tz)
ALARM_EMAIL = 'alarm_prod_xdsfas'@xx.org.slack.com' if profile == 'prod' else 'alarm_test_asdfasdf'@xx.org.slack.com'
default_args = {
'owner': owner,
'start_date': start_date,
'email': [ALARM_EMAIL, 'me@foo.com'],
'retries': 2,
'retry_delay': timedelta(minutes=1),
'provide_context': True,
'execution_timeout': timedelta(minutes=70),
}
해결방안
근본적으로 Variable.get() 을 사용하지 말라는 이유는, DAG() 로직은 로딩시점에도 호출되므로 실행될 우려가 있고, Variable 의 값은 DBMS에 저장되어 불필요한 연산이 꽤 걸린다는점이다. (실제 무거운 쿼리는 아니겠지만 N개의 DAG 들이 존재한다면 무시 못할수 있다)
그래서 이 문제를 해결하는 가장 현실적인 방법은 Variable.get() 호출을 안할수 없으니 빠르게 할 수 있는 방법을 찾는것이다.
그 방법은 결론부터 말하면 환경변수를 Variable 로 다루는것이다. 다행히 Airflow 에는 이런 사용방법이 가능했다.
export AIRFLOW_VAR_PROFILE=dev
# 환경변수 정의후 airflow 기동!
# DAG 에서는 Variable.get('PROFILE') 형태로 값 가져올수 있음
위와 같이 "AIRFLOW_VAR_키=값" 형태의 환경변수를 선언하고 이걸 Variable 에서 가져오는게 가능하다는말이다.
이렇게 되면, 해당 값은 DBMS 에 저장되는것이 아니라 환경변수의 값을 가져오는것이므로 비용이 상대적으로 낮아지게된다.
주의할점
주의할점은 UI 상에서는 보이지 않는 특별한 Variable 이므로, UI 에서 동일한 KEY 를 등록하는 실수를 할 수 있는데 이를 주의하자.
환경변수 기반의 Variable 의 KEY 값은 꼭 대문자로 선언해야 동작하기 때문에, 환경변수 기반의 Variable 은 대문자, dbms 에 저장되어 관리되는 Variable 은 소문자로 관리하는게 속편할것이다. 왜냐면, 코드를 쫒아가보면 key.upper() 형태로 변경후 환경변수값을 가져오는 형태로 구현되어있어서 그런걸로 추정된다.
airflow 의 DAG 를 구성해서 동작하는것에 집중했는데, 요즘은 좀더 좋은 방법이 무엇인지 고민하다보니 더 신경쓸게 많은것 같다.
'데이터처리 > Airflow' 카테고리의 다른 글
Airflow 2.x 의 주요 퍼미션별 권한 6개 상세 설명 : 캡쳐포함 (1) | 2024.08.28 |
---|---|
[오류] Airflow 에서 logical_date 가 잘못 생성되는 문제 - catchup 문제 (0) | 2023.12.14 |
[오류] DAG 의 filepath 값이 잘못 로딩되는 현상 원인과 해결방법 (0) | 2023.12.13 |
[AIRFLOW] BashOperator 에서 시스템 환경변수가 로딩안되는 문제 - env 지정시 (0) | 2023.12.11 |
[AIRFLOW] logical_date 의 개념 이해하기- execution_date 의 대체 (1) | 2023.12.05 |