티스토리 뷰

반응형

Flink 에서는 JDBC Connector 가 존재한다. 하지만 오피셜하게 지원되는 dbms는 총 3개 뿐이다.

MySQL과 PostgreSQL 그리고 Derby ...

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/

 

JDBC

JDBC SQL Connector # Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode The JDBC connector allows for reading data from and writing data into any relational databases with a JDBC driver. This document describes h

nightlies.apache.org

 

JDBC는 자바에서 DBMS를 연동하기위한 인터페이스 규약이다. 그렇기 때문에 JDBC 드라이버는 동일한 사용패턴을 갖지만 DBMS별로 쿼리가 약간씩 다르다보니 이 문제를 코드에 녹여넣어야 한다. 일종의 사투리라고 해야할까? (오라클의 대표적인 사투리 쿼리(?)는 행제한이 limit 가 아니라 rownum 인것이다)

 

Oracle JDBC Connector 빌드 & 적용방법

oracle 을 지원하도록 jdbc connector 는 이미 내가 수정했고, 해당 코드는 내 github 에 적용해두었다. 클래스 2개를 추가하고, 클래스 2개의 코드 일부를 수정하면 될 정도로 수정량은 크지 않다. (단, flink 1.13버전기준)

https://github.com/jungkoo/flink/pull/1

 

Oracle jdbc support 1.13 by jungkoo · Pull Request #1 · jungkoo/flink

oracle jdbc support test

github.com

 

빌드방법

git clone https://github.com/jungkoo/flink.git -b oracle-jdbc-support-1.13
cd flink/flink-connectors/flink-connector-jdbc/
mvn clean package -Dmaven.test.skip=true

적용방법

기존 jdbc connector 를 삭제하고, 새로 빌드한 jdbc connector jar 로 바꿔치면 된다. 빌드한 경로에 있다고 가정하면 다음과 같이 명령을 날린후 재기동하면 된다.

## jdbc connector 바꾸기
mkdir -p ${FLINK_HOME}/lib-backup
mv ${FLINK_HOME}/lib/*flink-connector-jdbc*.jar ${FLINK_HOME}/lib-backup
cp ./target/flink-connector-jdbc_2.11-1.13-SNAPSHOT.jar ${FLINK_HOME}/lib/

 

Flink 에서 Oracle Insert 테스트 (예시)

빌드한 jdbc connector 로 바꿔치기를 했다면 다음과 같이 오라클의 테이블을 맵핑해서 insert 테스트를 해볼 수 있다. 단, flink sql 에서는 테이블을 맵핑해서 연결하는 구조이기 때문에 oracle에서 사용하는 테이블은 수동으로 생성이 되어있어야 한다. (=예시에서는 test.t_flink_test 테이블이 된다)

Flink SQL> CREATE TABLE t_flink_oracle (
   keyword STRING,
   ymd TIMESTAMP,
   cnt DECIMAL,

   PRIMARY KEY (keyword, ymd) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:oracle:thin:@127.0.0.1:1526:test',
    'table-name' = 'test.t_flink_test',
    'username' = '{아이디}',
    'password' = '{암호}'
);

Flink SQL> INSERT INTO t_flink_oracle VALUES ('키워드', TO_TIMESTAMP('2021-01-02', 'yyyy-MM-dd'), 10);

만약 오류가 난다면 타입을 강하게 체크해서 그런걸수 있으니 타입을 체크해보자. 특히 sql-client.sh 의 콘솔에는 에러메시지가 명확하게 안보일수 있어서 flink dashboard 에서 subtask 의 log를 확인해야 원인을 찾을수 있으니 참고하도록 하자.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함