Hive 에서 list 와 같이 N개의 아이템이 담기는 필드를 행으로 풀어낼때 explode 쿼리를 사용한다. 하지만 flink 에서는 아무리 검색해도 해당 문법이 잘 안나오는데 그 이유는 hive 에서는 explode 라는 문법을 쓰지만 flink sql 에서는 cross join unnest 라는 문법을 사용해야 하기 때문에 검색이 안된 문제였다. explode 쿼리 예시 100번의 설명보다는 하나의 예시가 더 도움이 될것같아 예시를 들어보겠다. 다음과 같이 authros 필드에 n개의 필드가 1개의 row 를 풀어내려면 어떻게 해야할까? Flink SQL 로 표현하면 아래와 같이 cross join unnest 를 이용해 표현 가능하다. SELECT book_title, price, author.n..
Flink 에서는 sql-client.sh 를 실행해서 쿼리기반으로 실행하는 방법이 있고, dataStream API 를 이용해서 직접 java 코드를 짜서 만드는 방법이 있다. 두개를 병행해서 테스트하다보니 avro 라이브러리 충돌이 일어났다. 내가 만들려던건 kafka의 토픽데이터를 json 형태로 로컬 파일로 덤프내리는 flink application 을 만들려는 시도였는데 다음과 같은 오류가 발생되었다. 이걸 해결하는건 라이브러리 충돌이 원인이었고 우회하는건 빌드시점에 충돌안되게 하는게 가장 편했다. $ ./flink run /home1/myhome/flink-app/target/FlinkSample-1.0-SNAPSHOT.jar -kafka.bootstrap.servers=127.0.0.1:90..
Flink 로직을 돌릴때 로그에 "Could not acquire the minimum required resources" 라는 메시지가 보이면 이유는 리소스가 부족하기 때문이다. 보통 데이터를 처리할때는 DAG 라고 해서 데이터의 처리 흐름을 그래프로 그리고, 각 작업을 돌리게 된다. (그림에서 노란 동그라미가 하나의 슬롯이라고 생각하면 편하다) 하지만, 이 데이터를 처리하기위한 슬롯의 여유가 충분하지 않아서 데이터를 처리할수 없음을 의미한다. 이걸 해결하는건 taskmanager 를 더 추가로 늘려주면 된다. 원인과 해결 원인은 위에서 설명한것처럼 실행할 여유 슬롯이 부족하기 때문이다. 현재 여유 슬롯의 갯수를 보는 가장 쉬운건 flink dashboard (기본포트는 8081) 에서 확인이 가능하다..
얼마전 Flink 1.15 가 릴리즈 되었고, 테스트를 하는데 HA 관련 설정을 하면 오류가 발생했다. 주키퍼를 이용해 HA 를 처리하게 되고 관련된 설정은 "high-availability.zookeeper.quorum" 이다. 로그를 zookeeper 와 curator 관련 클래스에서 문제가 된걸 알수 있다. 결론부터 말하면 주키퍼 버전의 문제였다. 왜 이런 일이 일어났을까? 2022-05-12 13:11:45,213 INFO org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED 2022-05-12 13:11:45,453 INFO org.ap..
elasticsearch 의 연결가능성만 체크하고 싶은데, "Java High Level REST" 의 의존성은 너무 많기도 하고 번거로움이 많아서 RESTAPI 를 직접 사용해서 연결가능성만 체크하는 로직을 만들어 보았다. 구글링해보면 이거저거 나오긴 하는데 user, password 처리하는 샘플은 잘 나오지 않아서 좀 헤맸던거 같다. RestTemplate 로 직접 health 체크 요청하기 ElasticSearch는 대부분 restapi 를 제공하고 있고, http://127.0.0.1:9300/_cat/health?v 형태로 상태를 조회가능하다. 가장 쉬운건 이 주소로 요청했을때 정상적으로 리턴이 되는지 체크하면 연결가능여부를 판단할 수 있다. Spring 을 사용중이라면, RestTemplat..
Flink 의 "streaming-source.enable" 기능을 이용해서 hive의 데이터를 스트림 데이터스럽게 처리하려고 했는데 희안하게 어떤 테이블은 잘 되는데, 어떤 테이블은 또 안되는 현상이 발생되었다. 그 이유는 결론부터 말하면 파티션이 약 3만개를 넘어가면 새로운 파티션이 생긴걸 인지 못하는 문제가 있다. (지원안하면 오류를 내야지... 뭔가 버그가 아닌가 싶다) 파티션 감지할때 갯수제한이 있다고? 기능 테스트를 위해 새로 만든테이블에서는 잘 되는데, 기존테이블을 이용해 시도하는데 안된다면 이 문제일 확률이 높다. 소스코드를 분석해보면 알겠지만, 메타스토어에서 파티션 정보를 리턴받고, 이 데이터를 어플리케이션에서 필터링하고 기준값보다 큰 파티션이 무엇인가를 찾아가는 과정을 반복한다. 문제는..
flink 에서 hive 는 unbounded sacn 을 지원한다. 더 정확히 말하면 파티션이나 파일이 생기는걸 주기적으로 감시하다가 데이터를 조회하는 한다는게 더 맞을지도 모르겠다. 이게 뭔 의미가 있나 싶겠지만 스트림 데이터를 다루는것처럼 로직을 flink 에 submit 하면 새로운 파티션이 생길때마다 별도의 스케쥴링 없이 운영이 가능하다는 말이다. 즉, 배치형태로 작업을 구성하지 않고, 스트리밍 데이터를 다루듯 운용할수 있다는 말이다. Hive 데이터를 스트림 데이터처럼 다루기 hive 에 데이터를 적재할때 보통 파티션 단위로 데이터를 적재한다. 만약, airflow 를 이용해 데이터를 데이터를 재가공하는 로직을 등록한다면 특정 파티션을 센싱하고 있다가 생성이 되면 로직이 실행되는 구조로 DAG..
flink 에서 hive 를 연동할때, catalog 를 등록하면 바로 테이블 조회가 가능하다. 그런데 최대 슬롯은 10개이고, 여유 슬롯이 7개뿐인데 터무니없이 크게 task 를 할당해서 작업이 취소되는 경우가 발생했다. 다음과 같이 간단한 쿼리를 실행했더니 터무니없이 큰 640개의 task 가 요구되었고 당연히 리소스 부족으로 종료되었다. 해결하는 방법은 hive source 의 parallel 을 제한하면 해결된다. Flink SQL> CREATE CATALOG myHive with ( 'type' = 'hive', 'hive-conf-dir' = '/usr/hdp/2.6.2.0-205/hive/conf' ); Flink SQL> select count(*) from myHive.myDb.t_sam..