얼마전 Flink 1.15 가 릴리즈 되었고, 테스트를 하는데 HA 관련 설정을 하면 오류가 발생했다. 주키퍼를 이용해 HA 를 처리하게 되고 관련된 설정은 "high-availability.zookeeper.quorum" 이다. 로그를 zookeeper 와 curator 관련 클래스에서 문제가 된걸 알수 있다. 결론부터 말하면 주키퍼 버전의 문제였다. 왜 이런 일이 일어났을까? 2022-05-12 13:11:45,213 INFO org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED 2022-05-12 13:11:45,453 INFO org.ap..
elasticsearch 의 연결가능성만 체크하고 싶은데, "Java High Level REST" 의 의존성은 너무 많기도 하고 번거로움이 많아서 RESTAPI 를 직접 사용해서 연결가능성만 체크하는 로직을 만들어 보았다. 구글링해보면 이거저거 나오긴 하는데 user, password 처리하는 샘플은 잘 나오지 않아서 좀 헤맸던거 같다. RestTemplate 로 직접 health 체크 요청하기 ElasticSearch는 대부분 restapi 를 제공하고 있고, http://127.0.0.1:9300/_cat/health?v 형태로 상태를 조회가능하다. 가장 쉬운건 이 주소로 요청했을때 정상적으로 리턴이 되는지 체크하면 연결가능여부를 판단할 수 있다. Spring 을 사용중이라면, RestTemplat..
Flink 의 "streaming-source.enable" 기능을 이용해서 hive의 데이터를 스트림 데이터스럽게 처리하려고 했는데 희안하게 어떤 테이블은 잘 되는데, 어떤 테이블은 또 안되는 현상이 발생되었다. 그 이유는 결론부터 말하면 파티션이 약 3만개를 넘어가면 새로운 파티션이 생긴걸 인지 못하는 문제가 있다. (지원안하면 오류를 내야지... 뭔가 버그가 아닌가 싶다) 파티션 감지할때 갯수제한이 있다고? 기능 테스트를 위해 새로 만든테이블에서는 잘 되는데, 기존테이블을 이용해 시도하는데 안된다면 이 문제일 확률이 높다. 소스코드를 분석해보면 알겠지만, 메타스토어에서 파티션 정보를 리턴받고, 이 데이터를 어플리케이션에서 필터링하고 기준값보다 큰 파티션이 무엇인가를 찾아가는 과정을 반복한다. 문제는..
flink 에서 hive 는 unbounded sacn 을 지원한다. 더 정확히 말하면 파티션이나 파일이 생기는걸 주기적으로 감시하다가 데이터를 조회하는 한다는게 더 맞을지도 모르겠다. 이게 뭔 의미가 있나 싶겠지만 스트림 데이터를 다루는것처럼 로직을 flink 에 submit 하면 새로운 파티션이 생길때마다 별도의 스케쥴링 없이 운영이 가능하다는 말이다. 즉, 배치형태로 작업을 구성하지 않고, 스트리밍 데이터를 다루듯 운용할수 있다는 말이다. Hive 데이터를 스트림 데이터처럼 다루기 hive 에 데이터를 적재할때 보통 파티션 단위로 데이터를 적재한다. 만약, airflow 를 이용해 데이터를 데이터를 재가공하는 로직을 등록한다면 특정 파티션을 센싱하고 있다가 생성이 되면 로직이 실행되는 구조로 DAG..
flink 에서 hive 를 연동할때, catalog 를 등록하면 바로 테이블 조회가 가능하다. 그런데 최대 슬롯은 10개이고, 여유 슬롯이 7개뿐인데 터무니없이 크게 task 를 할당해서 작업이 취소되는 경우가 발생했다. 다음과 같이 간단한 쿼리를 실행했더니 터무니없이 큰 640개의 task 가 요구되었고 당연히 리소스 부족으로 종료되었다. 해결하는 방법은 hive source 의 parallel 을 제한하면 해결된다. Flink SQL> CREATE CATALOG myHive with ( 'type' = 'hive', 'hive-conf-dir' = '/usr/hdp/2.6.2.0-205/hive/conf' ); Flink SQL> select count(*) from myHive.myDb.t_sam..
Yarn 기반에서 Flink 클러스터를 기동하려고 했는데 다음과 같은 오류가 발생했다. 구글링을 해보니 중국어 사이트에서 해결방법을 찾았다. $ ./bin/yarn-session.sh --detached ..생략.. java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at java.lang.ClassLoader.defineClass(ClassLoader.java:1016) ~[?:?] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:1..
flink 에서는 create table like 구분이 있어서, 필드가 동일한 필드를 상속받듯이 필드선언을 할 수 있다. 하지만 alter table 에 대한 document 를 확인해보면, 테이블의 이름과 테이블 속성값만 바꿀수 있다. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table 재밌는건, table api 에는 addColumns, AddOrReplaceColumns, DropColumns 가 지원되지만, sql 로는 이걸 사용할수 없다는점이다. 그럼 flink 에서 sql 로 필드를 변경하는 방법은 정말 방법이 없을까? https://nightlies.apache.org/flink..
High-Availabily 를 이용하기 위해서 다음과 같이 설정을 하고 YARN 에서 클러스터를 구성하면 이상하게 1개 이상의 클러스터를 띄울수 없어서 삽질하게된 후기를 알려주고자 한다. Flink 의 주키퍼를 이용한 HA 쿠퍼네티스는 configMap 을 이용해서 HA 의 정보를 전달하지만, 일반적인 케이스에서는 Zookeeper를 이용해서 HA를 구성한다. 문서에서 보면 다음과 같은 예시가 있고 이렇게 설정이 되어야 마스터 노드 하나가 죽었을때, 대기중인 마스터노드가 active로 바뀌고 기존에 실행했던 정보를 이어서 실행할 수 있다. high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availab..