티스토리 뷰
반응형
execution_date 이해하기
Airflow 에는 execution_date 라는 개념이 있다. 그런데 매우 헛갈리게 이름을 정한거 같다.
이름만 보면 Processing 되는 시간으로 보이지만, 사실은 스케쥴의 시작시간으로 보면 더 이해가 빠르다.
스케쥴의 간격은 10분이라고 가정하고, 특정 시점을 예로 값을 정리하면 아래와 같다.
구분값 | 날짜 예시 |
Processing Time (스케쥴된 실행시점) | 2023-02-21 12:40:02 |
prev_execution_date | 2023-02-21 12:20:00 |
execution_date | 2023-02-21 12:30:00 |
next_execution_date | 2023-02-21 12:40:00 |
왜 이런값이 나올까? crontab 을 기반으로 작업하는 사람은 사실 Process Time 만 정의가 가능하다.
하지만, Airflow 에서는 단순히 스케쥴을 특정시간에 호출하는 개념이 아니라 논리적인 시간의 윈도우가 존재한다.
즉, 아래 그림에서 B 시점의 스케쥴은 사실 12:30 ~ 12:40 의 데이터 처리를 해야하기 때문에 12:40분에 실행되어야 하고,
해당 B 스케쥴의 execution_date 는 사이클의 시작날짜인 12:30 이 되는개념이다.
타임존 문제와 포맷 변경하기
execution_date 를 DAG 코드에서 바로 사용하면 값이 이상하다. 그 이유는 UTC 기준의 값으로 나왔기 때문이다.
타임존을 적용하려면, 아래와 같인 start_date 지정시 타임존을 추가하고, 템플릿에서 사용할때 변환하는 함수를 사용해야한다.
이때 날짜와 시간 모두 표현하려면 날짜포맷을 지정하면 한번에 모든걸 해결할 수 있다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import pendulum
local_tz = pendulum.timezone("Asia/Seoul")
DAG_ID = "myuser_sample_dag".join([owner, dag_name, version])
default_args = {
'owner': 'myuser',
'start_date': datetime(year=2023, month=2, day=21, hour=0, minute=0, tzinfo=local_tz),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'email': ['myuser@haha.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(
dag_id=DAG_ID,
user_defined_macros={'local_dt': lambda execution_date: execution_date.in_timezone(local_tz).strftime("%Y-%m-%d %H:%M:%S")},
default_args=default_args,
schedule_interval=timedelta(minutes=10)
)
sleep_5min = BashOperator(
task_id='echo',
bash_command="""
echo "date => `date`"
echo "logical_date => {{logical_date}}"
echo "execution_date => {{execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
echo "next_execution_date => {{next_execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
echo "prev_execution_date => {{prev_execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
echo "local_dt(execution_date) => {{local_dt(execution_date)}}"
echo "local_dt(next_execution_date) => {{local_dt(next_execution_date)}}"
echo "local_dt(prev_execution_date) => {{local_dt(prev_execution_date)}}"
""",
dag=dag
)
위와 같은 샘플코드를 실행한다면? 아래와 같이 한국시간 기준으로 변환된 execution_date 를 볼수 있고, 시분초까지도 포맷팅해서 볼 수 있다.
[2023-02-21 12:40:02,805] {subprocess.py:85} INFO - Output:
[2023-02-21 12:40:02,809] {subprocess.py:92} INFO - date => Tue Feb 21 12:40:02 JST 2023
[2023-02-21 12:40:02,810] {subprocess.py:92} INFO - logical_date => 2023-02-21T03:30:00+00:00
[2023-02-21 12:40:02,811] {subprocess.py:92} INFO - execution_date => 2023-02-21 03:30:00
[2023-02-21 12:40:02,812] {subprocess.py:92} INFO - next_execution_date => 2023-02-21 03:40:00
[2023-02-21 12:40:02,812] {subprocess.py:92} INFO - prev_execution_date => 2023-02-21 03:20:00
[2023-02-21 12:40:02,813] {subprocess.py:92} INFO - local_dt(execution_date) => 2023-02-21 12:30:00
[2023-02-21 12:40:02,814] {subprocess.py:92} INFO - local_dt(next_execution_date) => 2023-02-21 12:40:00
[2023-02-21 12:40:02,814] {subprocess.py:92} INFO - local_dt(prev_execution_date) => 2023-02-21 12:20:00
[2023-02-21 12:40:02,815] {subprocess.py:96} INFO - Command exited with return code 0
반응형
'데이터처리 > Airflow' 카테고리의 다른 글
댓글