티스토리 뷰

반응형

문제

Flink 에서 기본적으로 제공하는 JDBC Connector 는 Mysql , Derby, PostgreSQL 3개지를 지원한다. 

나는 sql-client.sh 를 통해서 쿼리기반으로 데이터를 다루는걸 자주 이용하는데 카탈로그 등록이 되서 바로 테이블을 접근할수 있어 연동하기 좋은데, 문제는 jsonb 타입을 지원하지 않는다.

추가로 내가 발견한 미지원되는 필드타입은 uuid 와 timestampz 필드이다.

Flink SQL> describe MyTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet

Flink SQL> describe  YouTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'uuid' yet

해결방법

가장 손쉬운건 uuid, json, jsonb 타입을 text 타입처럼 string으로 인지하도록 코드를 수정하는것이다.

내부적으로는 문자열은 LogicalType 타입값이 "VARCHAR" 로 인지되고 리턴되는 값은 String 타입이 기본으로 되어있다. 하지만 우리는 uuid, json, jsonb 타입도 문자열로 조회할 수 있게 했기 때문에, String 타입이 아닌 데이터는 문자열로 강제 변환하여 조회하는 방향으로 수정하면 조회가 가능하다.

 

https://github.com/jungkoo/flink/pull/3

사실, 내가 수정한 풀리퀘스트 링크가 있긴 한데 timestampz 타입의 timezone offset 문제는 해결이 안되었지만 필요한 사람을 위해서 우선 링크도 같이 제공하겠다.

 

PostgresCatalog.java

PostgresRowConverter.java

결과확인

위와 같이 코드를 수정하고 재기동 후 sql-client.sh 에서 쿼리를 실행하면 다음과 같이 정상 동작된다.

-- myJson 필드가 jsob 타입인데 확인가능
Flink SQL> describe MyTable;
+-----------------------+------------------+-------+---------+--------+-----------+
|                  name |             type |  null |     key | extras | watermark |
+-----------------------+------------------+-------+---------+--------+-----------+
|                    id |           BIGINT | false | PRI(id) |        |           |
...생략...
|                myJson |           STRING |  true |         |        |           |
|                  type |           STRING | false |         |        |           |
+-----------------------+------------------+-------+---------+--------+-----------+

-- id 가 uuid 타입인데 확인가능
Flink SQL> describe YouTable;
+-----------------+--------------+-------+-------------+--------+-----------+
|            name |         type |  null |         key | extras | watermark |
+-----------------+--------------+-------+-------------+--------+-----------+
|              id |       STRING | false | PRI(id) |        |           |
|          status |       BIGINT |  true |             |        |           |
...생략...
+-----------------+--------------+-------+-------------+--------+-----------+

참고로 timestampz 타입 문제가 없다면 flink-jdbc-connector 만 빌드해서 lib 폴더에 빌드한걸 바꿔쳐서 실행하면 된다.

요즘에 자잘하게 안되는 기능이 많아서 self 수정하게 되는게 많은것 같다. 나중에 찜찜한 코드도 수정해서 contribute 하는게 좋을것 같다. (받아줄지는 모르겠지만)

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함