데이터처리/Flink

[Flink] PostgreSQL 연동시 Doesn't support Postgres type 'jsonb' yet 문제

정선생 2022. 1. 26. 08:00
반응형

문제

Flink 에서 기본적으로 제공하는 JDBC Connector 는 Mysql , Derby, PostgreSQL 3개지를 지원한다. 

나는 sql-client.sh 를 통해서 쿼리기반으로 데이터를 다루는걸 자주 이용하는데 카탈로그 등록이 되서 바로 테이블을 접근할수 있어 연동하기 좋은데, 문제는 jsonb 타입을 지원하지 않는다.

추가로 내가 발견한 미지원되는 필드타입은 uuid 와 timestampz 필드이다.

Flink SQL> describe MyTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet

Flink SQL> describe  YouTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'uuid' yet

해결방법

가장 손쉬운건 uuid, json, jsonb 타입을 text 타입처럼 string으로 인지하도록 코드를 수정하는것이다.

내부적으로는 문자열은 LogicalType 타입값이 "VARCHAR" 로 인지되고 리턴되는 값은 String 타입이 기본으로 되어있다. 하지만 우리는 uuid, json, jsonb 타입도 문자열로 조회할 수 있게 했기 때문에, String 타입이 아닌 데이터는 문자열로 강제 변환하여 조회하는 방향으로 수정하면 조회가 가능하다.

 

https://github.com/jungkoo/flink/pull/3

사실, 내가 수정한 풀리퀘스트 링크가 있긴 한데 timestampz 타입의 timezone offset 문제는 해결이 안되었지만 필요한 사람을 위해서 우선 링크도 같이 제공하겠다.

 

PostgresCatalog.java

PostgresRowConverter.java

결과확인

위와 같이 코드를 수정하고 재기동 후 sql-client.sh 에서 쿼리를 실행하면 다음과 같이 정상 동작된다.

-- myJson 필드가 jsob 타입인데 확인가능
Flink SQL> describe MyTable;
+-----------------------+------------------+-------+---------+--------+-----------+
|                  name |             type |  null |     key | extras | watermark |
+-----------------------+------------------+-------+---------+--------+-----------+
|                    id |           BIGINT | false | PRI(id) |        |           |
...생략...
|                myJson |           STRING |  true |         |        |           |
|                  type |           STRING | false |         |        |           |
+-----------------------+------------------+-------+---------+--------+-----------+

-- id 가 uuid 타입인데 확인가능
Flink SQL> describe YouTable;
+-----------------+--------------+-------+-------------+--------+-----------+
|            name |         type |  null |         key | extras | watermark |
+-----------------+--------------+-------+-------------+--------+-----------+
|              id |       STRING | false | PRI(id) |        |           |
|          status |       BIGINT |  true |             |        |           |
...생략...
+-----------------+--------------+-------+-------------+--------+-----------+

참고로 timestampz 타입 문제가 없다면 flink-jdbc-connector 만 빌드해서 lib 폴더에 빌드한걸 바꿔쳐서 실행하면 된다.

요즘에 자잘하게 안되는 기능이 많아서 self 수정하게 되는게 많은것 같다. 나중에 찜찜한 코드도 수정해서 contribute 하는게 좋을것 같다. (받아줄지는 모르겠지만)

반응형