티스토리 뷰

반응형

execution_date 이해하기

Airflow 에는 execution_date 라는 개념이 있다. 그런데 매우 헛갈리게 이름을 정한거 같다.

이름만 보면 Processing 되는 시간으로 보이지만, 사실은 스케쥴의 시작시간으로 보면 더 이해가 빠르다.

스케쥴의 간격은 10분이라고 가정하고, 특정 시점을 예로 값을 정리하면 아래와 같다.

구분값 날짜 예시
Processing Time (스케쥴된 실행시점) 2023-02-21 12:40:02 
prev_execution_date 2023-02-21 12:20:00
execution_date 2023-02-21 12:30:00
next_execution_date 2023-02-21 12:40:00

왜 이런값이 나올까? crontab 을 기반으로 작업하는 사람은 사실 Process Time 만 정의가 가능하다. 

하지만, Airflow 에서는 단순히 스케쥴을 특정시간에 호출하는 개념이 아니라 논리적인 시간의 윈도우가 존재한다.

즉, 아래 그림에서 B 시점의 스케쥴은 사실 12:30 ~ 12:40 의 데이터 처리를 해야하기 때문에 12:40분에 실행되어야 하고,

해당 B 스케쥴의  execution_date 는 사이클의 시작날짜인 12:30 이 되는개념이다.

타임존 문제와 포맷 변경하기

execution_date 를 DAG 코드에서 바로 사용하면 값이 이상하다. 그 이유는 UTC 기준의 값으로 나왔기 때문이다.

타임존을 적용하려면, 아래와 같인 start_date 지정시 타임존을 추가하고, 템플릿에서 사용할때 변환하는 함수를 사용해야한다.

이때 날짜와 시간 모두 표현하려면 날짜포맷을 지정하면 한번에 모든걸 해결할 수 있다.

 

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import pendulum


local_tz = pendulum.timezone("Asia/Seoul")
DAG_ID = "myuser_sample_dag".join([owner, dag_name, version])
default_args = {
    'owner': 'myuser',
    'start_date': datetime(year=2023, month=2, day=21, hour=0, minute=0, tzinfo=local_tz),
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'email': ['myuser@haha.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    dag_id=DAG_ID,
    user_defined_macros={'local_dt': lambda execution_date: execution_date.in_timezone(local_tz).strftime("%Y-%m-%d %H:%M:%S")},
    default_args=default_args,
    schedule_interval=timedelta(minutes=10)
)

sleep_5min = BashOperator(
    task_id='echo',
    bash_command="""
        echo "date                            => `date`"        
        echo "logical_date                    => {{logical_date}}"
        echo "execution_date                  => {{execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
        echo "next_execution_date             => {{next_execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
        echo "prev_execution_date             => {{prev_execution_date.strftime("%Y-%m-%d %H:%M:%S")}}"
        echo "local_dt(execution_date)        => {{local_dt(execution_date)}}"
        echo "local_dt(next_execution_date)   => {{local_dt(next_execution_date)}}"
        echo "local_dt(prev_execution_date)   => {{local_dt(prev_execution_date)}}"
    """,
    dag=dag
)

위와 같은 샘플코드를 실행한다면? 아래와 같이 한국시간 기준으로 변환된 execution_date 를 볼수 있고, 시분초까지도 포맷팅해서 볼 수 있다. 

[2023-02-21 12:40:02,805] {subprocess.py:85} INFO - Output:
[2023-02-21 12:40:02,809] {subprocess.py:92} INFO - date                            => Tue Feb 21 12:40:02 JST 2023
[2023-02-21 12:40:02,810] {subprocess.py:92} INFO - logical_date                    => 2023-02-21T03:30:00+00:00
[2023-02-21 12:40:02,811] {subprocess.py:92} INFO - execution_date                  => 2023-02-21 03:30:00
[2023-02-21 12:40:02,812] {subprocess.py:92} INFO - next_execution_date             => 2023-02-21 03:40:00
[2023-02-21 12:40:02,812] {subprocess.py:92} INFO - prev_execution_date             => 2023-02-21 03:20:00
[2023-02-21 12:40:02,813] {subprocess.py:92} INFO - local_dt(execution_date)        => 2023-02-21 12:30:00
[2023-02-21 12:40:02,814] {subprocess.py:92} INFO - local_dt(next_execution_date)   => 2023-02-21 12:40:00
[2023-02-21 12:40:02,814] {subprocess.py:92} INFO - local_dt(prev_execution_date)   => 2023-02-21 12:20:00
[2023-02-21 12:40:02,815] {subprocess.py:96} INFO - Command exited with return code 0

 

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함