Flink 의 "streaming-source.enable" 기능을 이용해서 hive의 데이터를 스트림 데이터스럽게 처리하려고 했는데 희안하게 어떤 테이블은 잘 되는데, 어떤 테이블은 또 안되는 현상이 발생되었다. 그 이유는 결론부터 말하면 파티션이 약 3만개를 넘어가면 새로운 파티션이 생긴걸 인지 못하는 문제가 있다. (지원안하면 오류를 내야지... 뭔가 버그가 아닌가 싶다) 파티션 감지할때 갯수제한이 있다고? 기능 테스트를 위해 새로 만든테이블에서는 잘 되는데, 기존테이블을 이용해 시도하는데 안된다면 이 문제일 확률이 높다. 소스코드를 분석해보면 알겠지만, 메타스토어에서 파티션 정보를 리턴받고, 이 데이터를 어플리케이션에서 필터링하고 기준값보다 큰 파티션이 무엇인가를 찾아가는 과정을 반복한다. 문제는..
flink 에서 hive 는 unbounded sacn 을 지원한다. 더 정확히 말하면 파티션이나 파일이 생기는걸 주기적으로 감시하다가 데이터를 조회하는 한다는게 더 맞을지도 모르겠다. 이게 뭔 의미가 있나 싶겠지만 스트림 데이터를 다루는것처럼 로직을 flink 에 submit 하면 새로운 파티션이 생길때마다 별도의 스케쥴링 없이 운영이 가능하다는 말이다. 즉, 배치형태로 작업을 구성하지 않고, 스트리밍 데이터를 다루듯 운용할수 있다는 말이다. Hive 데이터를 스트림 데이터처럼 다루기 hive 에 데이터를 적재할때 보통 파티션 단위로 데이터를 적재한다. 만약, airflow 를 이용해 데이터를 데이터를 재가공하는 로직을 등록한다면 특정 파티션을 센싱하고 있다가 생성이 되면 로직이 실행되는 구조로 DAG..
flink 에서 hive 를 연동할때, catalog 를 등록하면 바로 테이블 조회가 가능하다. 그런데 최대 슬롯은 10개이고, 여유 슬롯이 7개뿐인데 터무니없이 크게 task 를 할당해서 작업이 취소되는 경우가 발생했다. 다음과 같이 간단한 쿼리를 실행했더니 터무니없이 큰 640개의 task 가 요구되었고 당연히 리소스 부족으로 종료되었다. 해결하는 방법은 hive source 의 parallel 을 제한하면 해결된다. Flink SQL> CREATE CATALOG myHive with ( 'type' = 'hive', 'hive-conf-dir' = '/usr/hdp/2.6.2.0-205/hive/conf' ); Flink SQL> select count(*) from myHive.myDb.t_sam..
Yarn 기반에서 Flink 클러스터를 기동하려고 했는데 다음과 같은 오류가 발생했다. 구글링을 해보니 중국어 사이트에서 해결방법을 찾았다. $ ./bin/yarn-session.sh --detached ..생략.. java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at java.lang.ClassLoader.defineClass(ClassLoader.java:1016) ~[?:?] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:1..
flink 에서는 create table like 구분이 있어서, 필드가 동일한 필드를 상속받듯이 필드선언을 할 수 있다. 하지만 alter table 에 대한 document 를 확인해보면, 테이블의 이름과 테이블 속성값만 바꿀수 있다. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table 재밌는건, table api 에는 addColumns, AddOrReplaceColumns, DropColumns 가 지원되지만, sql 로는 이걸 사용할수 없다는점이다. 그럼 flink 에서 sql 로 필드를 변경하는 방법은 정말 방법이 없을까? https://nightlies.apache.org/flink..
High-Availabily 를 이용하기 위해서 다음과 같이 설정을 하고 YARN 에서 클러스터를 구성하면 이상하게 1개 이상의 클러스터를 띄울수 없어서 삽질하게된 후기를 알려주고자 한다. Flink 의 주키퍼를 이용한 HA 쿠퍼네티스는 configMap 을 이용해서 HA 의 정보를 전달하지만, 일반적인 케이스에서는 Zookeeper를 이용해서 HA를 구성한다. 문서에서 보면 다음과 같은 예시가 있고 이렇게 설정이 되어야 마스터 노드 하나가 죽었을때, 대기중인 마스터노드가 active로 바뀌고 기존에 실행했던 정보를 이어서 실행할 수 있다. high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availab..
Flink 에서 세션모드로 application 이나 sql-client.sh 를 사용하려고 했는데, 이런 오류가 발생했다. 참고로 N대의 세션 클러스터를 기동하기위해 주키퍼 노드 관련 옵션을 추가해서 기동했다. 다음과 같이 기동하면 2개의 컨테이너가 유지되고, 따로 클러스터 유지가 가능하다. 참고로 -z 옵션은 "high-availability.cluster-id" 설정과 연계되는데 이게 기본적으로 yarn 의 application id 로 지정되다보니 데몬은 띄웠는데 어플리케이션이나 sql-client.sh 에서 쿼리 실행할 때 문제가 있다. ./bin/yarn-session.sh --detached -z yarn-flink-001 -nm myFlink01 ./bin/yarn-session.sh --..
결론부터 말하면, 이 현상은 checkpoint 관련 설정을 적용하고서 나타난 현상이었다. checkpoint 를 모르는 사람을 위해 간단히 쉽게 설명하면 자동세이브 기능을 생각하면 쉽다. 장애의 복구를 위해 중간중간 상태정보를 주기적으로 남기는 기능을 의미하는데, 이 주기가 도달하기전까지 결과를 지연시키는게 아닌가 싶다. 결과를 보려면? 내가 겼었던 쿼리 결과가 안나오는 현상은 checkpoint 에서 영향을 받았던 케이스 이기 때문에, 해당설정을 잠깐 줄여서 쿼리를 다시 실행하면 해결되었다. 설정을 영구하게 바꿀게 아니다보니 flink-conf.yaml 의 설정을 건드리는게 아니라 sql-client.sh 상에서 "set" 명령을 통해 확인 및 변경하는것이 좋다. (checkpoint 가 너무 자주 ..