티스토리 뷰
Flink 에서는 JDBC Connector 가 존재한다. 하지만 오피셜하게 지원되는 dbms는 총 3개 뿐이다.
MySQL과 PostgreSQL 그리고 Derby ...
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/
JDBC는 자바에서 DBMS를 연동하기위한 인터페이스 규약이다. 그렇기 때문에 JDBC 드라이버는 동일한 사용패턴을 갖지만 DBMS별로 쿼리가 약간씩 다르다보니 이 문제를 코드에 녹여넣어야 한다. 일종의 사투리라고 해야할까? (오라클의 대표적인 사투리 쿼리(?)는 행제한이 limit 가 아니라 rownum 인것이다)
Oracle JDBC Connector 빌드 & 적용방법
oracle 을 지원하도록 jdbc connector 는 이미 내가 수정했고, 해당 코드는 내 github 에 적용해두었다. 클래스 2개를 추가하고, 클래스 2개의 코드 일부를 수정하면 될 정도로 수정량은 크지 않다. (단, flink 1.13버전기준)
https://github.com/jungkoo/flink/pull/1
빌드방법
git clone https://github.com/jungkoo/flink.git -b oracle-jdbc-support-1.13
cd flink/flink-connectors/flink-connector-jdbc/
mvn clean package -Dmaven.test.skip=true
적용방법
기존 jdbc connector 를 삭제하고, 새로 빌드한 jdbc connector jar 로 바꿔치면 된다. 빌드한 경로에 있다고 가정하면 다음과 같이 명령을 날린후 재기동하면 된다.
## jdbc connector 바꾸기
mkdir -p ${FLINK_HOME}/lib-backup
mv ${FLINK_HOME}/lib/*flink-connector-jdbc*.jar ${FLINK_HOME}/lib-backup
cp ./target/flink-connector-jdbc_2.11-1.13-SNAPSHOT.jar ${FLINK_HOME}/lib/
Flink 에서 Oracle Insert 테스트 (예시)
빌드한 jdbc connector 로 바꿔치기를 했다면 다음과 같이 오라클의 테이블을 맵핑해서 insert 테스트를 해볼 수 있다. 단, flink sql 에서는 테이블을 맵핑해서 연결하는 구조이기 때문에 oracle에서 사용하는 테이블은 수동으로 생성이 되어있어야 한다. (=예시에서는 test.t_flink_test 테이블이 된다)
Flink SQL> CREATE TABLE t_flink_oracle (
keyword STRING,
ymd TIMESTAMP,
cnt DECIMAL,
PRIMARY KEY (keyword, ymd) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@127.0.0.1:1526:test',
'table-name' = 'test.t_flink_test',
'username' = '{아이디}',
'password' = '{암호}'
);
Flink SQL> INSERT INTO t_flink_oracle VALUES ('키워드', TO_TIMESTAMP('2021-01-02', 'yyyy-MM-dd'), 10);
만약 오류가 난다면 타입을 강하게 체크해서 그런걸수 있으니 타입을 체크해보자. 특히 sql-client.sh 의 콘솔에는 에러메시지가 명확하게 안보일수 있어서 flink dashboard 에서 subtask 의 log를 확인해야 원인을 찾을수 있으니 참고하도록 하자.
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] PostgreSQL 연동시 Doesn't support Postgres type 'jsonb' yet 문제 (0) | 2022.01.26 |
---|---|
[Flink] op 필드 없는 "debezium-avro-confluent" 포맷 사용 방법 (0) | 2022.01.12 |
[오류] Flink에서 HDFS 데이터 Sink 할때 inprogress 로 flush 안되는 현상 (0) | 2021.11.12 |
[오류] Flink SQL 에서 HIVE 데이터를 조회 못하는 문제 (0) | 2021.11.11 |
[세팅] Flink SQL 에서 HIVE Connector 연동하기 - hive 2.3.6 (0) | 2021.11.10 |