티스토리 뷰
Flink 에서 sql-client.sh 에서 쿼리기반으로 데이터를 가공하고, 저장(sink) 할 수 있다.
일반적으로 sink 용으로 많이 사용하는 스토리지는 hive 거나 elasticsearch 가 아닐까 싶다.
추가될 라이브러리는 "$FLINK_HOME/lib" 하위에 jar 파일을 복사하고 실행해야 한다.
필요한 라이브러리 세팅
flink 에 hive 를 연동하려면 hive 버전에 따라서 필요한 라이브러리가 다르다.
내가 사용하는 hive 의 버전은 2.3.6 이었기 때문에 다음과 같이 필요한 라이브러리를 다운로드 받아서 사용했다.
참고로, 라이브러리들은 "$FLINK_HOME/lib" 하위에 복사하면 된다.
만약, hive 버전이 다른걸 쓴다면 flink document 에서 필요한 의존파일을 꼭 확인하자. [링크]
# -------------------------------------------
# HIVE Connector 의존 라이브러리 받기
# ==================================
# 버전정보 : flink 1.14.0 + hive 2.3.6
# -------------------------------------------
cd ${FLINK_HOME}/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.14.0/flink-sql-connector-hive-2.3.6_2.11-1.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar
wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
hive client 설정정보 찾기
라이브러리는 준비되었고, 이제 HIVE 에 연동하기위한 hive 설정정보 위치를 찾아야 한다.
해당 서버에 hive client 가 깔려있다면 client 설정파일이 존재할텐데, 만약 세팅안된 서버면 세팅된 서버에서 xml 설정을 복사해오면 된다. 참고로 설정파일을 보면 다음과 같은 파일이 있을것 이다. flink sql 명령을 할때 이 설정파일 경로를 지정해줘야하니 기억해두자.
# -------------------------------------------------------
# Hive Configuration 폴더를 찾으면 아래와 같은 xml 설정이 존재한다 (참고)
# -------------------------------------------------------
% ls -l
-rw-r--r-- 1 user user 35160 10월 14 13:41 hive-site.xml
-rw-r--r-- 1 user user 2420 10월 14 13:45 hivemetastore-site.xml
-rw-r--r-- 1 user user 4777 10월 14 13:46 hiveserver2-site.xml
-rw-r--r-- 1 user user 2060 10월 14 13:46 ivysettings.xml
hive 연결하기
이제 sql-client.sh 를 실행해서 Flink SQL 에서 연동을 할 시간이다.
hive는 스키마를 저장하기 "메타스토어"라는 스키마 정보를 저장하는 저장소가 존재한다.
flink 에서는 스키마를 저장하는 곳을 카탈로그라고 부른다. 즉, 카탈로그라는 개념으로 hive 를 연결할수 있다.
예를 들어, 위에 이야기한 xml 형태의 설정파일이 있는 위치가 "/home/myUser/apache-hive-2.3.6-bin/conf" 에 존재한다면, 다음과 같은 쿼리를 실행하면 "myHive" 라는 이름으로 하이브가 연동된다. 기존에 flink 에서 선언한 테이블이 저장되는 카탈로그는 "default_catalog" 이니 이동할때 참고하자. 이렇게 카탈로그 이동을 해서 실제로 쿼리를 돌릴수 있다.
Flink SQL> create catalog myHive with (
> 'type' = 'hive',
> 'hive-conf-dir' = '/home/myUser/apache-hive-2.3.6-bin/conf'
> );
[INFO] Execute statement succeed.
);
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| myHive |
+-----------------+
Flink SQL> use catalog myHive;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+----------------+
| database name |
+----------------+
| fact |
| dimension |
| default |
| sample |
+----------------+
4 rows in set
Flink SQL> set sql-client.execution.result-mode=tableau; -- 조회할때 ui 형태가 아닌 text 결과로 보기위해서
Flink SQL> select * from dimension.t_address limit 4; -- hive 의 테이블을 조회할수 있음
한가지 팁으로 hive와 연동된 카탈로그에서 (여기서는 myHive) 테이블을 생성하면 영구히 테이블이 저장된다.
Flink SQL 에서 테이블을 생성하면 그 정보가 세션에서만 유지되서 불편했다면, 꼭 hive 와 연동해야한다.
(물론, 카탈로그 등록한것도 세션에만 유지되서 실행할때 다시 등록해줘야한다는건 함정이다)
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] op 필드 없는 "debezium-avro-confluent" 포맷 사용 방법 (0) | 2022.01.12 |
---|---|
[Flink] JDBC Connector 에서 "ORACLE" 연동하기 - flink 1.13 (1) | 2022.01.05 |
[오류] Flink에서 HDFS 데이터 Sink 할때 inprogress 로 flush 안되는 현상 (0) | 2021.11.12 |
[오류] Flink SQL 에서 HIVE 데이터를 조회 못하는 문제 (0) | 2021.11.11 |
[FLINK] kubernetes 환경에서 퍼시스턴트 볼륨 연동하기 - HA, checkPoint, savePoint (0) | 2021.11.05 |