Flink 1.14 에서는 잘 동작하던 쿼리가 Flink 1.15 에서 오류가 발생되었다. 쿼리는 kafka 테이블을 mysql 혹은 hdfs 에 sink 하는 로직이었고, 1.15.2 버전이 나와서 그걸 써도 동일한 문제가 발생되었다. Flink SQL> > INSERT INTO MysqlSource > SELECT > DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ymd, > DATE_FORMAT(window_start, 'HHmm') AS hh24, > keyword, > COUNT(*) as cnt > FROM TABLE( > TUMBLE( > DATA => TABLE KafkaSource, > TIMECOL => DESCRIPTOR(logTime), > SIZE =..
Flink 에서 Window 단위로 데이터를 다루려면 워터마크를 지정해야하고, 워터마크를 지정하기위해서 TO_TIMESTAMP_LTZ 함수를 써서 날짜타입으로 전환해서 사용해야 하는 경우가 종종 있다. 보통 EpochTime 이 들어있는 필드를 쓴다면 아래와 같은 형태로 워터마크를 선언하려고 할것이다. CREATE TABLE source_table { ... rowtime AS TO_TIMESTAMP_LTZ(logTime, 6), // logTime 의 단위는 microsec WATERMARK FOR rowtime AS rowtime } WITH { ... }; 그리고, 원본의 logTime 필드의 값이 microsec 이라서 정밀도값을 6으로 지정하면 다음과 같은 오류가 발생한다. org.apache...
Flink 에서는 Savepoint 와 Checkpoint 라는 개념이 있는데, 복구를 할때 사용할 메타정보들을 활용한다. 이 정보는 "state.checkpoint.dir" 이나 "state.savepoints.dir" 를 설정한 디렉토리의 하위에 _metadata 라는 파일로 존재한다. 이 파일을 확인해 보고 싶은경우가 있는데 다음과 같이 코드를 만들면 확인할 수 있다. _metadata 파일 확인하기 (java) 소스코드를 확인하면 복호화 하는 메소드를 금방 찾을 수 있는데, 문제는 파일로 저장할때는 이 byte[] 앞에 "매직넘버"와 "버전정보"가 추가로 저장되기 때문에, 해당 값을 읽어야 그 이후 데이터를 정상 처리할 수 있다는 점만 주의하면 된다. import org.apache.flink.r..
시간 관련된 필드를 다룰때 글로벌을 고려하면 복잡할게 많아진다. 동일한 이벤트 발생 시점이라도 어떤 타임존에 있느냐에 따라서 시간을 표시하는게 달라지기 때문이다. 이런걸 고려하기 위해서 요즘은 절대적인 날짜값을 저장하고, 타임존 정보를 이용해서 시간을 다루는 필드들도 생겨나기 시작했다. 물론, Flink 에도 타임존을 고려한 날짜 필드가 존재한다. 그게 TIMESTAMP_LTZ 타입이다. 이 타입은 다음과 같이 Flink 에서 타임존 값에 영향을 받는다. SET 'table.local-time-zone' = 'Asia/Shanghai'; 타임존에 맞춰서 조회하려면? TIMESTAMP_LTZ 필드는 타임존 세팅값에 따라서 값을 계산해서 출력해준다. 쉽게 생각해서 내부적으로는 UTC 와 같은 절대기준(?)의..
Hive 에서 list 와 같이 N개의 아이템이 담기는 필드를 행으로 풀어낼때 explode 쿼리를 사용한다. 하지만 flink 에서는 아무리 검색해도 해당 문법이 잘 안나오는데 그 이유는 hive 에서는 explode 라는 문법을 쓰지만 flink sql 에서는 cross join unnest 라는 문법을 사용해야 하기 때문에 검색이 안된 문제였다. explode 쿼리 예시 100번의 설명보다는 하나의 예시가 더 도움이 될것같아 예시를 들어보겠다. 다음과 같이 authros 필드에 n개의 필드가 1개의 row 를 풀어내려면 어떻게 해야할까? Flink SQL 로 표현하면 아래와 같이 cross join unnest 를 이용해 표현 가능하다. SELECT book_title, price, author.n..
Flink 에서는 sql-client.sh 를 실행해서 쿼리기반으로 실행하는 방법이 있고, dataStream API 를 이용해서 직접 java 코드를 짜서 만드는 방법이 있다. 두개를 병행해서 테스트하다보니 avro 라이브러리 충돌이 일어났다. 내가 만들려던건 kafka의 토픽데이터를 json 형태로 로컬 파일로 덤프내리는 flink application 을 만들려는 시도였는데 다음과 같은 오류가 발생되었다. 이걸 해결하는건 라이브러리 충돌이 원인이었고 우회하는건 빌드시점에 충돌안되게 하는게 가장 편했다. $ ./flink run /home1/myhome/flink-app/target/FlinkSample-1.0-SNAPSHOT.jar -kafka.bootstrap.servers=127.0.0.1:90..
Flink 로직을 돌릴때 로그에 "Could not acquire the minimum required resources" 라는 메시지가 보이면 이유는 리소스가 부족하기 때문이다. 보통 데이터를 처리할때는 DAG 라고 해서 데이터의 처리 흐름을 그래프로 그리고, 각 작업을 돌리게 된다. (그림에서 노란 동그라미가 하나의 슬롯이라고 생각하면 편하다) 하지만, 이 데이터를 처리하기위한 슬롯의 여유가 충분하지 않아서 데이터를 처리할수 없음을 의미한다. 이걸 해결하는건 taskmanager 를 더 추가로 늘려주면 된다. 원인과 해결 원인은 위에서 설명한것처럼 실행할 여유 슬롯이 부족하기 때문이다. 현재 여유 슬롯의 갯수를 보는 가장 쉬운건 flink dashboard (기본포트는 8081) 에서 확인이 가능하다..
얼마전 Flink 1.15 가 릴리즈 되었고, 테스트를 하는데 HA 관련 설정을 하면 오류가 발생했다. 주키퍼를 이용해 HA 를 처리하게 되고 관련된 설정은 "high-availability.zookeeper.quorum" 이다. 로그를 zookeeper 와 curator 관련 클래스에서 문제가 된걸 알수 있다. 결론부터 말하면 주키퍼 버전의 문제였다. 왜 이런 일이 일어났을까? 2022-05-12 13:11:45,213 INFO org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED 2022-05-12 13:11:45,453 INFO org.ap..