Flink 에서는 Savepoint 와 Checkpoint 라는 개념이 있는데, 복구를 할때 사용할 메타정보들을 활용한다. 이 정보는 "state.checkpoint.dir" 이나 "state.savepoints.dir" 를 설정한 디렉토리의 하위에 _metadata 라는 파일로 존재한다. 이 파일을 확인해 보고 싶은경우가 있는데 다음과 같이 코드를 만들면 확인할 수 있다. _metadata 파일 확인하기 (java) 소스코드를 확인하면 복호화 하는 메소드를 금방 찾을 수 있는데, 문제는 파일로 저장할때는 이 byte[] 앞에 "매직넘버"와 "버전정보"가 추가로 저장되기 때문에, 해당 값을 읽어야 그 이후 데이터를 정상 처리할 수 있다는 점만 주의하면 된다. import org.apache.flink.r..
시간 관련된 필드를 다룰때 글로벌을 고려하면 복잡할게 많아진다. 동일한 이벤트 발생 시점이라도 어떤 타임존에 있느냐에 따라서 시간을 표시하는게 달라지기 때문이다. 이런걸 고려하기 위해서 요즘은 절대적인 날짜값을 저장하고, 타임존 정보를 이용해서 시간을 다루는 필드들도 생겨나기 시작했다. 물론, Flink 에도 타임존을 고려한 날짜 필드가 존재한다. 그게 TIMESTAMP_LTZ 타입이다. 이 타입은 다음과 같이 Flink 에서 타임존 값에 영향을 받는다. SET 'table.local-time-zone' = 'Asia/Shanghai'; 타임존에 맞춰서 조회하려면? TIMESTAMP_LTZ 필드는 타임존 세팅값에 따라서 값을 계산해서 출력해준다. 쉽게 생각해서 내부적으로는 UTC 와 같은 절대기준(?)의..
보통 Unique Count 를 구하기위해서 다음과 같은 쿼리를 많이 사용한다. 유니크 카운트가 필요한 대표적인 사례가 방문한 사람이 몇명인지 카운팅하는 User Count 를 구할때이다. 단순한 count 가 아니라, 중복 방문을 제거후 카운팅을 해야하다보니 비용이 비싼 쿼리이다. SELECT count(distinct user_id) FROM logs WHERE ymd = '2022-01-02'; 1개의 리듀서로 처리된다? count(distinct ...) 형태로 쿼리를 날리면, "mapred.reduce.tasks=100" 같이 리듀서 갯수를 강제로 키워도 1개의 리듀서로 처리된다. 즉, 병렬처리가 안되기 때문에 매우 큰 데이터에서 이런 쿼리를 돌리게 되면?? 결과 보기가 하늘의 별따기가 된다. ..
Hive 에서 쿼리를 돌리다보면, 특정 리듀서 하나에서 작업이 안끝나고 무한정 대기하는 경우가 종종있다. 이런 경우 skewed 형태의 데이터구조일 확률이 높다. skewed 라는건 데이터가 균일하지 않고, 특정 key 에 데이터가 쏠려있음을 의미한다. 특정 리듀서에 데이터가 쏠려있어서 병목이 되는 현상이다. (이미지에서 1개의 task 만 끝나지 않는 상황인걸 보면 쉽게 이해 할수 있다) Skewed 데이터 문제 이런 데이터 쏠림현상은 생각보다 흔하다. 우리나라 성씨만 봐도 "김씨" 가 압도적으로 많다. 아마 ㄱㄴㄷ 형태로 담당자를 정하면 ㄱ을 담당한 사람은 숫자 카운팅할때 엄~~청 오래걸릴수 밖에 없을것이다. 이런 문제를 해결하려면 데이터 쏠림이 심한 테이터를 분산처리할 수 있도록 유도해주어야 한다...
구글시트에는 웹의 데이터를 추출해서 시트의 값으로 활용할 수 있는 좋은 기능이 있다. importXML 과 importHTML 이 대표적이다. 하지만, RESTAPI 형태로 제공되는 데이터의 경우 일반적으로 json 데이터를 사용한다. 그런데 기본적으로 importJSON 이라는 펑션이 제공되지 않는다. 하지만, 확장기능을 통해 설치하고 쉽게 데이터를 시트에 표현할 수 있다. importJSON 설치하기 google script 에 함수를 복붙해서 추가하는 방법으로 가이드된 내용도 많이 있지만, 확장프로그램에서 쉽게 설치하는 방법이 있어서 그 방법을 가이드 하고자 한다. 아마 이미지만 대충 봐도 설치와 활성화는 매우 쉽게 할 수있을것으로 보인다. 1 . 마켓에서 importJSON 설치하기 구글시트의..
Hive 에서 list 와 같이 N개의 아이템이 담기는 필드를 행으로 풀어낼때 explode 쿼리를 사용한다. 하지만 flink 에서는 아무리 검색해도 해당 문법이 잘 안나오는데 그 이유는 hive 에서는 explode 라는 문법을 쓰지만 flink sql 에서는 cross join unnest 라는 문법을 사용해야 하기 때문에 검색이 안된 문제였다. explode 쿼리 예시 100번의 설명보다는 하나의 예시가 더 도움이 될것같아 예시를 들어보겠다. 다음과 같이 authros 필드에 n개의 필드가 1개의 row 를 풀어내려면 어떻게 해야할까? Flink SQL 로 표현하면 아래와 같이 cross join unnest 를 이용해 표현 가능하다. SELECT book_title, price, author.n..
Flink 에서는 sql-client.sh 를 실행해서 쿼리기반으로 실행하는 방법이 있고, dataStream API 를 이용해서 직접 java 코드를 짜서 만드는 방법이 있다. 두개를 병행해서 테스트하다보니 avro 라이브러리 충돌이 일어났다. 내가 만들려던건 kafka의 토픽데이터를 json 형태로 로컬 파일로 덤프내리는 flink application 을 만들려는 시도였는데 다음과 같은 오류가 발생되었다. 이걸 해결하는건 라이브러리 충돌이 원인이었고 우회하는건 빌드시점에 충돌안되게 하는게 가장 편했다. $ ./flink run /home1/myhome/flink-app/target/FlinkSample-1.0-SNAPSHOT.jar -kafka.bootstrap.servers=127.0.0.1:90..
Flink 로직을 돌릴때 로그에 "Could not acquire the minimum required resources" 라는 메시지가 보이면 이유는 리소스가 부족하기 때문이다. 보통 데이터를 처리할때는 DAG 라고 해서 데이터의 처리 흐름을 그래프로 그리고, 각 작업을 돌리게 된다. (그림에서 노란 동그라미가 하나의 슬롯이라고 생각하면 편하다) 하지만, 이 데이터를 처리하기위한 슬롯의 여유가 충분하지 않아서 데이터를 처리할수 없음을 의미한다. 이걸 해결하는건 taskmanager 를 더 추가로 늘려주면 된다. 원인과 해결 원인은 위에서 설명한것처럼 실행할 여유 슬롯이 부족하기 때문이다. 현재 여유 슬롯의 갯수를 보는 가장 쉬운건 flink dashboard (기본포트는 8081) 에서 확인이 가능하다..