티스토리 뷰
최근 검색량이 높은 키워드가 무엇인지를 알고 싶을때, flink 에서는 호핑윈도우(슬라이딩 윈도우) 기반으로 지정하고, 슬라이드 사이즈와 데이터 간격을 지정해서 로직을 유도하여 만드는걸 구성했다. 이해를 돕기위해 대충 쿼리를 표현하면 아래와 같다.
하지만, 처음에는 잘 동작하다가 어느순간 backpress 가 발생해서 데이터 지연으로 제대로 처리안되는 문제가 발생했다.
INSERT INTO top_keyword_slide
...생략....
FROM TABLE(
HOP(
DATA => TABLE kafka_log,
TIMECOL => DESCRIPTOR(log_time),
SLIDE => INTERVAL '30' SECOND,
SIZE => INTERVAL '10' MINUTES)
)
WHERE
valid = true
GROUP BY window_start, window_end, keyword
)
)
WHERE `rank` <= 10001
해결방법
backpressure 가 발생하는 이유는 스트리밍데이터를 제한시간안에 처리하지 못해서 계속 밀리는 현상이 발생되기 때문이기 때문이다.
즉, 이 문제에서는 데이터를 처리하는 task 가 왜 그 데이터를 처리하지 못했는가에 대한 접근부터 해야한다.
그러면 일꾼을 늘리거나, 짐을 줄이거나 2가지 접근법으로 생각해야한다. (그 이후엔 물리적인 튜닝이 필요)
1. Parallelism 올리기
가장 손쉬운 방법이다. 대시보드에서 상태를 체크해보고, 각 로직을 봤을때 Parallelism 이 1 로 구성되어 있다면, 데이터 크기가 꽤 크기 때문에 발생되는것이다. 만약, 1로 세팅되어있다면 쿼리 실행전에 parallelism 값을 더 올려서 실행해보도록 하자.
예를 들어 아래와 같이 세팅하면, 각 작업의 병렬화가 올라가서 해결이 되는경우가 있다.
하지만, 근본적으로 source 데이터의 성능문제나 카티널리티가 너무 높은 데이터의 윈도우 사이즈가 너무 크거나 하면 이걸 올려도 해결안될수 있다.
set parallelism.default=8;
2. 분석 데이터양을 줄이기
busy 상태의 task 를 모니터링해서 다음 시간텀내에 task 가 처리되는지 확인해봐야한다.
예를 들어, 10분에 30초 슬라이드로 데이터를 뽑는다면, 하나의 사이클은 30초이내에 완료처리가 되어야한다.
그렇지 않으면 지연이 일어나면서, backpressure 가 발생되어 이후 데이터는 계속 지연이 되는 아비규환(?)의 상태가 된다.
이걸 완화하는건 task 의 갯수를 늘리는게 가장 첫번째 방법이지만, 무한정 parallel 을 올리는것은 불가능하다.
그럼 반대로 처리해야할 데이터를 줄여내는 방법밖에 없다.
2.1 윈도우 사이즈 / TOP N 갯수 줄이기
예를 들면, 아래와 같이 윈도우 사이즈를 10분에서 5분으로 줄이고, top N 도 1만건에서 1천건 수준으로 낮춰보는것이다.
INSERT INTO top_keyword_slide
...생략....
FROM TABLE(
HOP(
DATA => TABLE kafka_log,
TIMECOL => DESCRIPTOR(log_time),
SLIDE => INTERVAL '30' SECOND,
--SIZE => INTERVAL '10' MINUTES
SIZE => INTERVAL '5' MINUTES
)
)
WHERE
valid = true
GROUP BY window_start, window_end, keyword
)
)
-- WHERE `rank` <= 10001
WHERE `rank` <= 1001
2.2 데이터 필터링 및 파이프라인 구성
TOP N 의 숫자를 줄일수 없다면, 처리할 데이터의 범위를 축소하거나 노멀라이즈 하는 방법이 필요하다.
예를 들어, 검색어의 경우 모두 집계 대상으로 한다면 별의별 키워드가 다 집계될텐데 (오타나 문장 같은 이상한 데이터까지) 이런 데이터를 필터링해서 집계한다면 불필요한 데이터 처리를 할 필요가 없어서 지연이 일어날 확률이 적어진다.
예를 들면, 검색어 TOP N 을 뽑을때 필터링 한다면 아래와 같은 조건을 추가하면 그룹핑될 데이터의 처리량을 줄일수 있다.
- 검색어 길이가 15글자를 넘어가는걸 제외한다거나 (문장형 검색어 제외)
- 검색결과가 0건인 로그를 제외 (희귀 키워드는 제외)
- 키워드 노멀라이징 ( 에르메스, 애르메스, 애르매스, hermes -> 에르메스)
3. 물리적인 튜닝
1,2 번이 가장 쉽게 해볼수 있는 시도이고, 그 다음은 task 의 지연로직이 뭔지 분석후 접근을 해야한다.
만약, kafka 데이터 fetch 해오는곳에서 지연이 일어난다면 kafka 에서 consum 할때의 성능튜닝을 봐야하고, sink 할때 지연이 일어난다면 sink 되는 스토리지의 write 튜닝방법을 찾아야 할것이다.
그리고, 중간 단계에서 데이터 처리하는 로직에서 문제가 된다면, 단일 쿼리로 만드는게 아니라 중간 데이터를 만들고, 그 데이터를 재가공할때 성능을 올릴수 있는 스토리지를 붙이거나 아님 물리적인 데이터 자체가 커서 문제라면 flink 의 task manager 를 늘려서 task 의 병렬화를 높여서 해결해야할것이다.
'데이터처리 > Flink' 카테고리의 다른 글
[FLINK] sql-client 에서 작업이름(job name)을 직접 지정하는 방법 (0) | 2024.01.09 |
---|---|
[Flink] Hive Catalog 등록 실패 : RangerAccessControlException 오류 (0) | 2023.03.13 |
[Flink] UnsupportedFileSystemSchemeException - 데몬 기동 오류 해결하기 (0) | 2023.02.20 |
[FLINK] Yarn Session 클러스터를 띄울때 Deployment took more than 60 seconds 대기하는 문제 해결방법 (3가지 케이스) (0) | 2022.12.01 |
[FlinkSQL] Kafka 연동시 SCRAM-SHA-512 인증이 필요한 경우 해결방법 (0) | 2022.10.14 |